This is an automated email from the ASF dual-hosted git repository.

joerghoh pushed a commit to branch SLING-13021-concurrent-import
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit bac3ed9621eafa513352b28bbf5f2c2ad6d0d13b
Author: Joerg Hoh <[email protected]>
AuthorDate: Mon Dec 1 19:14:19 2025 +0100

    SLING-13201 delegate the import to an executor
---
 .../impl/subscriber/DistributionSubscriber.java    | 86 ++++++++++++++++++----
 .../impl/subscriber/SubscriberConfiguration.java   |  3 +
 .../journal/impl/subscriber/SubscriberTest.java    |  7 +-
 3 files changed, 81 insertions(+), 15 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index bda2a83..b056490 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -37,8 +37,17 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -130,6 +139,10 @@ public class DistributionSubscriber {
     private volatile boolean running = true;
 
     private LongSupplier catchAllDelay = catchAllDelays.get();
+    
+    private CompletionService<PackageMessageResult> completionService;
+    private ExecutorService importExecutor;
+    private AtomicLong totalNumberOfImportedMessages = new AtomicLong(0);
 
     private final Delay delay = new Delay();
        private AtomicReference<DistributionAgentState> state = new 
AtomicReference<DistributionAgentState>(DistributionAgentState.IDLE);
@@ -192,15 +205,32 @@ public class DistributionSubscriber {
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
 
         packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, 
Reset.latest, assign,
-                HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, 
this::handleOffsetMessage));
+                HandlerAdapter.create(PackageMessage.class, 
this::delegateMessageToExecutor), HandlerAdapter.create(OffsetMessage.class, 
this::handleOffsetMessage));
 
         int announceDelay = 
Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
         announcer = new Announcer(subSlingId, subAgentName, queueNames,
                 messagingProvider.createSender(Topics.DISCOVERY_TOPIC), 
bookKeeper,
                 config.maxRetries(), config.editable(), announceDelay);
-
-        LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}", subAgentName, startOffset,
-                queueNames, config.subscriberIdleCheck());
+        
+        importExecutor = 
Executors.newFixedThreadPool(config.concurrentImportingThreads(), new 
ThreadFactory() {
+            
+            AtomicInteger id = new AtomicInteger(0);
+            
+            public Thread newThread(Runnable r) {
+                int no = id.incrementAndGet();
+                Thread t =  new Thread(r);
+                t.setName("DistributionSubscriber-importer-" + no);
+                return t;
+              }
+        });
+        completionService = new ExecutorCompletionService<>(importExecutor);
+
+        LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}, {} importing threads", 
+                subAgentName, 
+                startOffset,
+                queueNames, 
+                config.subscriberIdleCheck(),
+                config.concurrentImportingThreads());
     }
 
     private String getFirst(String[] agentNames) {
@@ -235,6 +265,7 @@ public class DistributionSubscriber {
 
         IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, 
idleCheck, commandPoller);
         running = false;
+        importExecutor.shutdown();
         LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent 
names {} with package builder {}",
                 subAgentName, queueNames, pkgType);
     }
@@ -244,21 +275,33 @@ public class DistributionSubscriber {
         return (isBlocked) ? DistributionAgentState.BLOCKED : state.get();
     }
 
-    private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
-       boolean done = false;
-       while (!done && running) {
-               done = tryProcess(info, message);
+    private void delegateMessageToExecutor (MessageInfo info, PackageMessage 
message) {
+        // TODO: do we need that completionService? It will store all 
submitted futures
+        //  and we should take() them eventually ...
+        completionService.submit(() -> handlePackageMessage(info, message));
+        totalNumberOfImportedMessages.incrementAndGet();
+    }
+
+    private PackageMessageResult handlePackageMessage(MessageInfo info, 
PackageMessage message) {
+        PackageMessageResult result = null;
+        boolean done = false;
+        while (!done && running) {
+            result = tryProcess(info,message);
+            done = result.success;
         }
+        return result;
     }
 
-       public boolean tryProcess(MessageInfo info, PackageMessage message) {
+       public PackageMessageResult tryProcess(MessageInfo info, PackageMessage 
message) {
+           PackageMessageResult result = new PackageMessageResult();
                if (shouldSkip(info, message)) {
             try {
                 bookKeeper.skipPackage(info.getOffset());
             } catch (PersistenceException | LoginException e) {
                 LOG.warn("Error marking distribution package {} at offset={} 
as skipped", message, info.getOffset(), e);
             }
-            return true;
+            result.success = true;
+            return result;
         }
         subscriberMetrics.getPackageJournalDistributionDuration()
                .update((currentTimeMillis() - info.getCreateTime()), 
TimeUnit.MILLISECONDS);
@@ -269,7 +312,8 @@ public class DistributionSubscriber {
             // Precondition timed out. We only log this on info level as it is 
no error
             LOG.info(e.getMessage());
             delay.await(RETRY_DELAY_MILLIS);
-            return false;
+            result.success = false;
+            return result;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             LOG.debug(e.getMessage());
@@ -277,11 +321,13 @@ public class DistributionSubscriber {
             // Catch all to prevent processing from stopping
             LOG.error("Error processing queue item", e);
             delay.await(catchAllDelay.getAsLong());
-            return false;
+            result.success = false;
+            return result;
         } finally {
             announcer.run();
         }
-        return true;
+        result.success = true;
+        return result;
        }
 
     private void handleOffsetMessage(MessageInfo info, OffsetMessage message) {
@@ -372,4 +418,18 @@ public class DistributionSubscriber {
         throw new PreConditionTimeoutException(msg);
     }
 
+    // This is just for testing -- wait that all async messages have been 
imported
+    public void waitForAllMessagesBeingImported() throws InterruptedException {
+        for (int i=0; i < totalNumberOfImportedMessages.get();i++) {
+            completionService.take();
+        }
+    }
+    
+    
+    class PackageMessageResult {
+        
+        public boolean success;
+        
+    }
+    
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index bcf2e0b..d225948 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -70,4 +70,7 @@ public @interface SubscriberConfiguration {
 
     @AttributeDefinition(description = "Number of ms to wait before retrying 
to process a package.")
     int acceptableAgeDiffMs() default 120 * 1000;
+    
+    @AttributeDefinition(description = "Number of threads importing content 
concurrently")
+    int concurrentImportingThreads() default 1;
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1bdbe19..88ade05 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -238,7 +238,7 @@ public class SubscriberTest {
     }
     
     @Test
-    public void testReceiveNotSubscribed() throws DistributionException {
+    public void testReceiveNotSubscribed() throws DistributionException, 
InterruptedException {
         assumeNoPrecondition();
         initSubscriber(Collections.singletonMap("agentNames", "dummy"));
         assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
@@ -253,6 +253,7 @@ public class SubscriberTest {
         for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) {
             packageHandler.handle(info, message);
         }
+        subscriber.waitForAllMessagesBeingImported();
         assertThat(getStoredOffset(), equalTo(100l));
     }
     
@@ -271,7 +272,7 @@ public class SubscriberTest {
     }
 
     @Test
-    public void testImportPreAndPostProcessInvoked() throws 
DistributionException, ImportPostProcessException, ImportPreProcessException {
+    public void testImportPreAndPostProcessInvoked() throws 
DistributionException, ImportPostProcessException, ImportPreProcessException, 
InterruptedException {
         assumeNoPrecondition();
         initSubscriber();
         assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
@@ -287,6 +288,8 @@ public class SubscriberTest {
         props.put(DISTRIBUTION_PACKAGE_ID, message.getPkgId());
         props.put(DISTRIBUTION_COMPONENT_NAME, message.getPubAgentName());
 
+        subscriber.waitForAllMessagesBeingImported();
+        
         verify(importPreProcessor, times(1)).process(props);
         verify(importPostProcessor, times(1)).process(props);
     }

Reply via email to