This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-10112
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 79e74b9aaa204807ec82db07f571d79a40ec9fe7
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Feb 1 22:35:02 2021 +0100

    SLING-10112 - Only start import when CommandPoller idle
---
 .../journal/impl/subscriber/CommandPoller.java     | 11 +++-
 .../impl/subscriber/DistributionSubscriber.java    | 13 +++--
 .../journal/impl/subscriber/CommandPollerTest.java |  2 +-
 .../journal/impl/subscriber/SubscriberTest.java    | 66 +++++++++++++++++++---
 4 files changed, 79 insertions(+), 13 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index e602fee..9bcef69 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
@@ -38,11 +39,13 @@ public class CommandPoller implements Closeable {
     private final String subSlingId;
     private final String subAgentName;
     private final Closeable poller;
+    private final IdleCheck idleCheck;
     private final AtomicLong clearOffset = new AtomicLong(-1);
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName) {
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName, int idleMillies) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
+        this.idleCheck = new SubscriberIdle(idleMillies, new AtomicBoolean());
         this.poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
@@ -55,12 +58,14 @@ public class CommandPoller implements Closeable {
     }
 
     private void handleCommandMessage(MessageInfo info, ClearCommand message) {
+        idleCheck.busy(0);
         if (!subSlingId.equals(message.getSubSlingId()) || 
!subAgentName.equals(message.getSubAgentName())) {
             LOG.debug("Skip command for subSlingId {}", 
message.getSubSlingId());
             return;
         }
 
         handleClearCommand(message.getOffset());
+        idleCheck.idle();
     }
 
     private void handleClearCommand(long offset) {
@@ -77,4 +82,8 @@ public class CommandPoller implements Closeable {
     public void close() {
         IOUtils.closeQuietly(poller);
     }
+
+    public boolean isIdle() {
+        return idleCheck.isIdle();
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 67512b2..8b1baec 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -84,6 +84,7 @@ public class DistributionSubscriber {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
+    private static final long COMMAND_NOT_IDLE_DELAY_MS = 200;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DistributionSubscriber.class);
 
@@ -117,7 +118,7 @@ public class DistributionSubscriber {
     private volatile Closeable idleReadyCheck; //NOSONAR
     
     private volatile IdleCheck idleCheck; //NOSONAR
-    
+
     private Closeable packagePoller;
 
     private volatile CommandPoller commandPoller; //NOSONAR
@@ -152,13 +153,13 @@ public class DistributionSubscriber {
         requireNonNull(precondition);
         requireNonNull(bookKeeperFactory);
 
+        Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", 
SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
         if (config.editable()) {
-            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName);
+            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, idleMillies);
         }
 
         if (config.subscriberIdleCheck()) {
             // Unofficial config (currently just for test)
-            Integer idleMillies = (Integer) 
properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
             AtomicBoolean readyHolder = 
subscriberReadyStore.getReadyHolder(subAgentName);
             
             idleCheck = new SubscriberIdle(idleMillies, readyHolder);
@@ -275,7 +276,11 @@ public class DistributionSubscriber {
         LOG.info("Started Queue processor");
         while (running) {
             try {
-                fetchAndProcessQueueItem();
+                if (commandPoller == null || commandPoller.isIdle()) {
+                    fetchAndProcessQueueItem();
+                } else {
+                    delay(COMMAND_NOT_IDLE_DELAY_MS);
+                }
             } catch (PreConditionTimeoutException e) {
                 // Precondition timed out. We only log this on info level as 
it is no error
                 LOG.info(e.getMessage());
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index c4be949..4816f14 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -147,7 +147,7 @@ public class CommandPollerTest {
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME);
+        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME, 1000);
         commandHandler = handlerCaptor.getValue().getHandler();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 66dac34..d920205 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -71,6 +71,7 @@ import 
org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
 import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
@@ -181,15 +182,25 @@ public class SubscriberTest {
     
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> packageCaptor;
+    
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<ClearCommand>> commandCaptor;
+    
+    @Captor
+    private ArgumentCaptor<PackageStatusMessage> statusMessageCaptor;
 
     @Mock
     private Closeable poller;
     
     @Mock
+    private Closeable commandPoller;
+    
+    @Mock
     private ServiceRegistration<DistributionAgent> reg;
     
     private MessageHandler<PackageMessage> packageHandler;
-
+    
+    private MessageHandler<ClearCommand> commandHandler;
 
     @Before
     public void before() {
@@ -208,11 +219,18 @@ public class SubscriberTest {
         
when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender);
 
         when(clientProvider.createPoller(
-                Mockito.anyString(),
+                Mockito.eq(topics.getPackageTopic()),
                 Mockito.eq(Reset.earliest), 
                 Mockito.anyString(),
                 packageCaptor.capture()))
             .thenReturn(poller);
+        
+        when(clientProvider.createPoller(
+                Mockito.eq(topics.getCommandTopic()),
+                Mockito.eq(Reset.earliest), 
+                commandCaptor.capture()))
+            .thenReturn(commandPoller);
+        
         when(context.registerService(Mockito.any(Class.class), eq(subscriber), 
Mockito.any(Dictionary.class))).thenReturn(reg);
 
         // you should call initSubscriber in each test method
@@ -261,10 +279,10 @@ public class SubscriberTest {
         sem.release();
         
         waitSubscriber(IDLE);
-        verify(statusSender, times(0)).accept(anyObject());
+        verifyNoStatusMessageSent();
     }
 
-       @Test
+    @Test
     public void testReceiveDelete() throws DistributionException, 
LoginException, PersistenceException {
         assumeNoPrecondition();
         initSubscriber();
@@ -281,9 +299,29 @@ public class SubscriberTest {
         sem.release();
         
         waitSubscriber(IDLE);
+        verifyNoStatusMessageSent();
         assertThat(getResource("/test"), nullValue());
     }
 
+    /**
+     *  We must make sure that a delete command in the queue is honored if 
possible.
+     */
+    @Test
+    public void testReceiveNotProcessedWhenDeleteCommandLate() throws 
Exception {
+        assumeNoPrecondition();
+        initSubscriber(ImmutableMap.of("editable", "true"));
+
+        MessageInfo info = createInfo(0l);
+        PackageMessage message = BASIC_ADD_PACKAGE;
+        packageHandler.handle(info, message);
+        
+        ClearCommand clearCommand = 
ClearCommand.builder().offset(10).subAgentName(SUB1_AGENT_NAME).subSlingId(SUB1_SLING_ID).build();
+        Thread.sleep(500);
+        commandHandler.handle(info, clearCommand);
+        
+        verifyStatusMessageSentWithStatus(Status.REMOVED);
+    }
+
     @Test
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();
@@ -295,7 +333,7 @@ public class SubscriberTest {
         PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
         
-        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
+        verifyStatusMessageSentWithStatus(Status.REMOVED_FAILED);
     }
 
     @Test
@@ -309,7 +347,7 @@ public class SubscriberTest {
         packageHandler.handle(info, message);
         
         waitSubscriber(IDLE);
-        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
+        verifyStatusMessageSentWithStatus(Status.IMPORTED);
     }
 
     @Test
@@ -322,7 +360,7 @@ public class SubscriberTest {
         packageHandler.handle(info, message);
         
         await().until(this::getStatus, 
equalTo(PackageStatusMessage.Status.REMOVED));
-        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
+        verifyStatusMessageSentWithStatus(Status.REMOVED);
     }
     
     @Test
@@ -354,6 +392,17 @@ public class SubscriberTest {
         sem.release();
     }
     
+    private void verifyNoStatusMessageSent() {
+        verify(statusSender, times(0)).accept(anyObject());
+    }
+
+    private PackageStatusMessage verifyStatusMessageSentWithStatus(Status 
expectedStatus) {
+        verify(statusSender, 
timeout(10000).times(1)).accept(statusMessageCaptor.capture());
+        PackageStatusMessage statusMessage = statusMessageCaptor.getValue();
+        assertThat(statusMessage.getStatus(), equalTo(expectedStatus));
+        return statusMessage;
+    }
+
     private OngoingStubbing<DistributionPackageInfo> whenInstallPackage() 
throws DistributionException {
         return 
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), 
Mockito.any(ByteArrayInputStream.class)));
     }
@@ -401,6 +450,9 @@ public class SubscriberTest {
         subscriber.bookKeeperFactory = bookKeeperFactory;
         subscriber.activate(config, context, props);
         packageHandler = packageCaptor.getValue().getHandler();
+        if ("true".equals(props.get("editable"))) {
+            commandHandler = commandCaptor.getValue().getHandler();
+        }
     }
 
     private void waitSubscriber(DistributionAgentState expectedState) {

Reply via email to