This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 697c9dd  SLING-10112 - Do not wait for command poller to be idle (#86)
697c9dd is described below

commit 697c9dd7ba83c5bf10e6a148aba971db1e8e7832
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jul 15 15:51:15 2021 +0200

    SLING-10112 - Do not wait for command poller to be idle (#86)
    
    * SLING-10112 - Do not wait for command poller to be idle
    
    * SLING-10112 - Fix import
---
 .../journal/impl/subscriber/CommandPoller.java        |  9 +--------
 .../impl/subscriber/DistributionSubscriber.java       | 10 ++--------
 .../journal/impl/subscriber/SubscriberIdle.java       |  4 ----
 .../journal/impl/subscriber/CommandPollerTest.java    |  2 +-
 .../journal/impl/subscriber/SubscriberIdleTest.java   |  6 ++++--
 .../journal/impl/subscriber/SubscriberTest.java       | 19 -------------------
 6 files changed, 8 insertions(+), 42 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 22734a6..fcdb2b6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -38,14 +38,12 @@ public class CommandPoller implements Closeable {
     private final String subSlingId;
     private final String subAgentName;
     private final Closeable poller;
-    private final IdleCheck idleCheck;
     private final AtomicLong clearOffset = new AtomicLong(-1);
     private final Runnable callback;
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName, int idleMillies, Runnable callback) {
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName, Runnable callback) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
-        this.idleCheck = new SubscriberIdle(idleMillies, 
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
         this.callback = callback;
         this.poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
@@ -59,14 +57,12 @@ public class CommandPoller implements Closeable {
     }
 
     private void handleCommandMessage(MessageInfo info, ClearCommand command) {
-        idleCheck.busy(0);
         if (!subSlingId.equals(command.getSubSlingId()) || 
!subAgentName.equals(command.getSubAgentName())) {
             LOG.debug("Skip command for subSlingId {}", 
command.getSubSlingId());
             return;
         }
 
         handleClearCommand(command);
-        idleCheck.idle();
         callback.run();
     }
 
@@ -85,7 +81,4 @@ public class CommandPoller implements Closeable {
         IOUtils.closeQuietly(poller);
     }
 
-    public boolean isIdle() {
-        return idleCheck.isIdle();
-    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4d7d841..9e676d9 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
@@ -95,7 +94,6 @@ public class DistributionSubscriber {
     static long RETRY_DELAY = SECONDS.toMillis(5);
     static long MAX_RETRY_DELAY = MINUTES.toMillis(15);
     static long QUEUE_FETCH_DELAY = SECONDS.toMillis(1);
-    private static final long COMMAND_NOT_IDLE_DELAY = 
MILLISECONDS.toMillis(200);
     private static final Supplier<LongSupplier> catchAllDelays = () -> 
exponential(RETRY_DELAY, MAX_RETRY_DELAY);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DistributionSubscriber.class);
@@ -171,7 +169,7 @@ public class DistributionSubscriber {
 
         Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", 
SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
         if (config.editable()) {
-            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, idleMillies, delay::signal);
+            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, delay::signal);
         }
 
         if (config.subscriberIdleCheck()) {
@@ -304,11 +302,7 @@ public class DistributionSubscriber {
         LOG.info("Started Queue processor");
         while (running) {
             try {
-                if (commandPoller == null || commandPoller.isIdle()) {
-                    fetchAndProcessQueueItem();
-                } else {
-                    delay.await(COMMAND_NOT_IDLE_DELAY);
-                }
+                fetchAndProcessQueueItem();
             } catch (PreConditionTimeoutException e) {
                 // Precondition timed out. We only log this on info level as 
it is no error
                 LOG.info(e.getMessage());
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index afcd824..417b97d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -40,10 +40,6 @@ public class SubscriberIdle implements IdleCheck {
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> schedule;
 
-    public SubscriberIdle(int idleMillis, int forceIdleMillies) {
-        this(idleMillis, forceIdleMillies, new AtomicBoolean());
-    }
-    
     public SubscriberIdle(int idleMillis, int forceIdleMillies, AtomicBoolean 
readyHolder) {
         this.idleMillis = idleMillis;
         this.isReady = readyHolder;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index ae6b310..bb26102 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -157,7 +157,7 @@ public class CommandPollerTest {
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME, 1000, callback);
+        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME, callback);
         commandHandler = handlerCaptor.getValue().getHandler();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index 471c119..2035bfe 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -22,6 +22,8 @@ import static 
org.apache.sling.distribution.journal.impl.subscriber.SubscriberId
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,7 +35,7 @@ public class SubscriberIdleTest {
 
     @Before
     public void before() {
-        idle = new SubscriberIdle(IDLE_MILLIES, 
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+        idle = new SubscriberIdle(IDLE_MILLIES, 
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, new AtomicBoolean());
     }
 
     @After
@@ -76,7 +78,7 @@ public class SubscriberIdleTest {
     
     @Test
     public void testStartIdle() throws InterruptedException {
-        idle = new SubscriberIdle(IDLE_MILLIES, 
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+        idle = new SubscriberIdle(IDLE_MILLIES, 
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, new AtomicBoolean());
         assertThat("Initial state", idle.isIdle(), equalTo(false));
         Thread.sleep(IDLE_MILLIES * 2);
         assertThat("State after time over idle limit", idle.isIdle(), 
equalTo(true));
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 5a191d4..045b47a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -354,25 +354,6 @@ public class SubscriberTest {
         verifyNoStatusMessageSent();
     }
 
-    /**
-     *  We must make sure that a delete command in the queue is honored if 
possible.
-     */
-    @Test
-    public void testReceiveNotProcessedWhenDeleteCommandLate() throws 
Exception {
-        assumeNoPrecondition();
-        initSubscriber(ImmutableMap.of("editable", "true"));
-
-        MessageInfo info = createInfo(0l);
-        PackageMessage message = BASIC_ADD_PACKAGE;
-        packageHandler.handle(info, message);
-        
-        Thread.sleep(500);
-        ClearCommand clearCommand = 
ClearCommand.builder().offset(10).subAgentName(SUB1_AGENT_NAME).subSlingId(SUB1_SLING_ID).build();
-        commandHandler.handle(info, clearCommand);
-        
-        verifyStatusMessageSentWithStatus(Status.REMOVED);
-    }
-
     @Test
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();

Reply via email to