This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f4bf6  SLING-9472 - Make Precondition return immediately (#45)
a5f4bf6 is described below

commit a5f4bf60ed6937c7aa28d9be52e453b59c7f8c6b
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 30 14:31:45 2020 +0200

    SLING-9472 - Make Precondition return immediately (#45)
    
    * SLING-9472 - Make Precondition return immediately
    
    * SLING-9472 - Make test more robust
    
    * SLING-9472 - Fix exception
    
    * SLING-9472 - Add test case for precondition SKIP and small refactoring
    
    * SLING-9472 - Test that status message ist sent
    
    * SLING-9472 - Remove unused componentReg to avoid NPE
    
    * SLING-9472 - Return wait in case of interrupt
    
    * SLING-9472 - Add test for skipped packages
    
    * SLING-9472 - throw PreConditionTimeoutException on shutdown
    
    * SLING-9472 - reinterrupt
---
 .../impl/precondition/DefaultPrecondition.java     |   4 +-
 .../journal/impl/precondition/Precondition.java    |   5 +-
 .../impl/precondition/StagingPrecondition.java     |  48 +--------
 .../journal/impl/subscriber/BookKeeper.java        |  12 ++-
 .../impl/subscriber/DistributionSubscriber.java    |  46 +++++---
 .../PreConditionTimeoutException.java}             |  15 ++-
 .../journal/impl/subscriber/SubscriberIdle.java    |   4 +-
 .../impl/precondition/DefaultPreconditionTest.java |   6 +-
 .../precondition/PackageStatusWatcherTest.java     |   2 +-
 .../impl/precondition/StagingPreconditionTest.java |  53 +++------
 .../impl/publisher/DistributionPublisherTest.java  |   2 +-
 .../journal/impl/queue/impl/PubQueueCacheTest.java |   6 --
 .../journal/impl/shared/LocalStoreTest.java        |   1 -
 .../journal/impl/shared/PackageRetriesTest.java    |   2 -
 .../journal/impl/subscriber/SubscriberTest.java    | 119 ++++++++++++++-------
 15 files changed, 162 insertions(+), 163 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index dceed02..5144378 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
 @Component(immediate = true, service = Precondition.class, property = { 
"name=default" })
 public class DefaultPrecondition implements Precondition {
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) {
-        return true;
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        return Decision.ACCEPT;
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 69da7b4..e0a201c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,10 @@ public interface Precondition {
      * @param pkgOffset the offset of the package
      * @param timeoutSeconds max seconds to wait until returning
      * @throws TimeoutException if the timeout expired without being able to 
determine status
-     * @throws IllegalStateException if the precondition can't be evaluated
+     * @throws InterruptedException if the thread was interrupted and should 
shut down
      * @return true if the package can be processed; otherwise it returns 
false.
      */
-    boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) throws TimeoutException;
+    Decision canProcess(String subAgentName, long pkgOffset);
 
+    enum Decision { ACCEPT, SKIP, WAIT}
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 9b55848..be27fe2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,8 +21,6 @@ package 
org.apache.sling.distribution.journal.impl.precondition;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
-import java.util.concurrent.TimeoutException;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -48,8 +46,6 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StagingPrecondition.class);
 
-    private static final long STATUS_CHECK_DELAY_MS = 100;
-
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -58,8 +54,6 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     private volatile PackageStatusWatcher watcher;
 
-    private volatile boolean running = true;
-    
     @Activate
     public void activate() {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -68,40 +62,16 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     @Deactivate
     public synchronized void deactivate() {
-
-        /*
-         * Note that we don't interrupt blocking calls using Thread.interrupt()
-         * because interrupts can stop the Apache Oak repository.
-         *
-         * See SLING-9340, OAK-2609 and 
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
-         */
-
         IOUtils.closeQuietly(watcher);
-        running = false;
     }
 
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) throws TimeoutException {
-        if (timeoutSeconds < 1) {
-            throw new IllegalArgumentException();
-        }
-
-        // try to get the status for timeoutSeconds and then throw
-        for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
-            Status status = getStatus(subAgentName, pkgOffset);
-            if (status != null) {
-                return status == Status.IMPORTED;
-            } else {
-                delayStatusCheck();
-            }
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        Status status = getStatus(subAgentName, pkgOffset);
+        if (status == null) {
+            return Decision.WAIT;
         }
-
-        if (!running) {
-            throw new IllegalStateException("Staging precondition is shutting 
down");
-        }
-
-        throw new TimeoutException("Timeout waiting for package offset " + 
pkgOffset + " on status topic.");
-
+        return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) 
{
@@ -114,12 +84,4 @@ public class StagingPrecondition implements Precondition, 
Runnable {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
     }
 
-    private static void delayStatusCheck() {
-        try {
-            Thread.sleep(STATUS_CHECK_DELAY_MS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 0541b41..18fe950 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -73,11 +73,13 @@ import org.slf4j.MDC;
  * agent on the leader instance.
  */
 public class BookKeeper implements Closeable {
-    private static final String KEY_OFFSET = "offset";
+    static final String STORE_TYPE_PACKAGE = "packages";
+    static final String STORE_TYPE_STATUS = "statuses";
+    static final String KEY_OFFSET = "offset";
+    static final int COMMIT_AFTER_NUM_SKIPPED = 10;
     private static final String SUBSERVICE_IMPORTER = "importer";
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
-    private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
@@ -120,8 +122,8 @@ public class BookKeeper implements Closeable {
         // Error queues are enabled when the number
         // of retry attempts is limited ; disabled otherwise
         this.errorQueueEnabled = (maxRetries >= 0);
-        this.statusStore = new LocalStore(resolverFactory, "statuses", 
subAgentName);
-        this.processedOffsets = new LocalStore(resolverFactory, "packages", 
subAgentName);
+        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, 
subAgentName);
+        this.processedOffsets = new LocalStore(resolverFactory, 
STORE_TYPE_PACKAGE, subAgentName);
     }
     
     /**
@@ -281,7 +283,7 @@ public class BookKeeper implements Closeable {
     }
     
     public long loadOffset() {
-        return  processedOffsets.load(KEY_OFFSET, -1L);
+        return processedOffsets.load(KEY_OFFSET, -1L);
     }
 
     public int getRetries(String pubAgentName) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9a5625b..b739dcb 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -33,7 +33,6 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
@@ -48,7 +47,6 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.agent.DistributionAgentState;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
@@ -57,6 +55,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -64,7 +63,6 @@ import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -124,8 +122,6 @@ public class DistributionSubscriber {
     
     private Optional<SubscriberIdle> subscriberIdle;
     
-    private ServiceRegistration<DistributionAgent> componentReg;
-
     private Closeable packagePoller;
 
     private CommandPoller commandPoller;
@@ -145,6 +141,7 @@ public class DistributionSubscriber {
     private String pkgType;
 
     private volatile boolean running = true;
+    private Thread queueThread;
 
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext 
context, Map<String, Object> properties) {
@@ -188,7 +185,7 @@ public class DistributionSubscriber {
 
         commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, editable);
 
-        startBackgroundThread(this::processQueue,
+        queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", 
subAgentName));
 
         int announceDelay = 
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
@@ -216,11 +213,16 @@ public class DistributionSubscriber {
          * See SLING-9340, OAK-2609 and 
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
          */
 
-        componentReg.unregister();
         IOUtils.closeQuietly(announcer, bookKeeper, 
                 packagePoller, commandPoller);
         subscriberIdle.ifPresent(IOUtils::closeQuietly);
         running = false;
+        try {
+            queueThread.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.info("Join interrupted");
+        }
         String msg = String.format(
                 "Stopped Subscriber agent %s, subscribed to Publisher agent 
names %s with package builder %s",
                 subAgentName, queueNames, pkgType);
@@ -299,12 +301,12 @@ public class DistributionSubscriber {
             }
 
             try (Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time()) {
-                processQueueItem(item.get().getInfo(), 
item.get().getMessage());
+                processQueueItem(item.get());
             } finally {
                 subscriberIdle.ifPresent(SubscriberIdle::idle);
             }
 
-        } catch (TimeoutException e) {
+        } catch (PreConditionTimeoutException e) {
             // Precondition timed out. We only log this on info level as it is 
no error
             LOG.info(e.getMessage());
             delay(RETRY_DELAY);
@@ -344,9 +346,10 @@ public class DistributionSubscriber {
         return Optional.empty();
     }
 
-    private void processQueueItem(MessageInfo info, PackageMessage queueItem) 
throws PersistenceException, LoginException, DistributionException, 
TimeoutException {
+    private void processQueueItem(FullMessage<PackageMessage> item) throws 
PersistenceException, LoginException, DistributionException {
+        MessageInfo info = item.getInfo();
+        PackageMessage pkgMsg = item.getMessage();
         long offset = info.getOffset();
-        PackageMessage pkgMsg = queueItem;
         boolean skip = shouldSkip(offset);
         subscriberIdle.ifPresent(SubscriberIdle::busy);
         if (skip) {
@@ -359,8 +362,24 @@ public class DistributionSubscriber {
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 
-    private boolean shouldSkip(long offset) throws TimeoutException {
-        return commandPoller.isCleared(offset) || 
!precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
+    private boolean shouldSkip(long offset) {
+        boolean cleared = commandPoller.isCleared(offset);
+        Decision decision = waitPrecondition(offset);
+        return cleared || decision == Decision.SKIP;
+    }
+
+    private Decision waitPrecondition(long offset) {
+        Decision decision = Precondition.Decision.WAIT;
+        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 
1000;
+        while (decision == Decision.WAIT && System.currentTimeMillis() < 
endTime && running) {
+            decision = precondition.canProcess(subAgentName, offset);
+            if (decision == Decision.WAIT) {
+                delay(100);
+            } else {
+                return decision;
+            }
+        }
+        throw new PreConditionTimeoutException("Timeout waiting for package 
offset " + offset + " on status topic.");
     }
 
     private static void delay(long delayInMs) {
@@ -370,5 +389,4 @@ public class DistributionSubscriber {
             Thread.currentThread().interrupt();
         }
     }
-
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
similarity index 67%
copy from 
src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
copy to 
src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
index dceed02..684ab9d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.precondition;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
-import org.osgi.service.component.annotations.Component;
-
-@Component(immediate = true, service = Precondition.class, property = { 
"name=default" })
-public class DefaultPrecondition implements Precondition {
-    @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) {
-        return true;
+public class PreConditionTimeoutException extends RuntimeException {
+    public PreConditionTimeoutException(String msg) {
+        super(msg);
     }
+
+    private static final long serialVersionUID = 6286011641627241560L;
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index c7af419..3220a31 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -79,7 +79,9 @@ public class SubscriberIdle implements SystemReadyCheck, 
Closeable {
     public synchronized void idle() {
         if (!isReady.get()) {
             cancelSchedule();
-            schedule = executor.schedule(this::ready, idleMillis, 
TimeUnit.MILLISECONDS);
+            if (!executor.isShutdown()) {
+                schedule = executor.schedule(this::ready, idleMillis, 
TimeUnit.MILLISECONDS);
+            }
         }
     }
     
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index f279265..c44bf13 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -21,13 +21,13 @@ package 
org.apache.sling.distribution.journal.impl.precondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import 
org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.junit.Test;
 
 public class DefaultPreconditionTest {
     @Test
     public void testAlwaysTrue() {
-        boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 
10);
-        assertThat(canProcess, equalTo(true));
+        Decision decision = new DefaultPrecondition().canProcess("any", 100);
+        assertThat(decision, equalTo(Decision.ACCEPT));
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index f882f32..bf771ac 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -89,7 +89,7 @@ public class PackageStatusWatcherTest {
     void generateMessages(int begin, int end) {
         MessageHandler<PackageStatusMessage> handler = 
adapterCaptor.getValue().getHandler();
         for (int i=begin; i<end; i++) {
-            handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0L),
+            handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l),
                     createStatusMessage(i));
         }
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 3a31224..f64d24d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,22 +19,18 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -56,7 +52,7 @@ public class StagingPreconditionTest {
     private static final String SUB1_SLING_ID = "sub1sling";
     private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
     private static final String PUB1_AGENT_NAME = "pub1agent";
-    private static final Long OFFSET_NOT_PRESENT = 111111L;
+    private static final Long OFFSET_NOT_PRESENT = 111111l;
 
     @Mock
     private MessagingProvider clientProvider;
@@ -88,47 +84,28 @@ public class StagingPreconditionTest {
         statusHandler = statusCaptor.getValue().getHandler();
     }
     
-    @Test(expected = IllegalArgumentException.class)
-    public void testIllegalTimeout() throws InterruptedException, 
TimeoutException {
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
-    }
-    
-    @Test(expected = TimeoutException.class)
+    @Test
     public void testNotYetProcessed() throws InterruptedException, 
TimeoutException {
         simulateMessage(OTHER_AGENT, 1002, 
PackageStatusMessage.Status.IMPORTED);
-        boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 
1);
-        assertThat(res, equalTo(true));
+        Decision res = precondition.canProcess(OTHER_AGENT, 
OFFSET_NOT_PRESENT);
+        assertThat(res, equalTo(Decision.WAIT));
+
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 
OFFSET_NOT_PRESENT);
+        assertThat(res2, equalTo(Decision.WAIT));
 
-        // We got no package for this agent. So this should time out
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
     }
     
     @Test
-    public void testDeactivateDuringCanProcess() {
-        AtomicReference<Throwable> exHolder = new AtomicReference<>();
-        Thread th = new Thread(() -> {
-            try {
-                precondition.canProcess(GP_SUB1_AGENT_NAME, 
OFFSET_NOT_PRESENT, 2);
-            } catch (Throwable t) {
-                exHolder.set(t);
-            }
-        });
-        th.start();
-        precondition.deactivate();
-        Throwable ex = Awaitility.await().until(exHolder::get, notNullValue());
-        assertThat(ex, instanceOf(IllegalStateException.class));
-    }
-    
-    @Test(expected = TimeoutException.class)
     public void testCleanup() throws InterruptedException, TimeoutException {
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, 
PackageStatusMessage.Status.IMPORTED);
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res, equalTo(Decision.ACCEPT));
         
         // Cleanup
         precondition.run();
         
-        // Should time out because after cleanup message is not present anymore
-        precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res2, equalTo(Decision.WAIT));
     }
     
     @Test
@@ -137,9 +114,9 @@ public class StagingPreconditionTest {
         simulateMessage(GP_SUB1_AGENT_NAME, 1001, 
PackageStatusMessage.Status.REMOVED);
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, 
PackageStatusMessage.Status.IMPORTED);
 
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), 
equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), 
equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), 
equalTo(Decision.ACCEPT));
     }
 
     private void simulateMessage(String subAgentName, long pkgOffset, 
PackageStatusMessage.Status status) {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index fcaddae..78ddc6d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -268,7 +268,7 @@ public class DistributionPublisherTest {
         Counter counter = new TestCounter();
         
when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter);
         try {
-            DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
+            publisher.getQueue(QUEUE_NAME);
             fail("Expected exception not thrown");
         } catch (RuntimeException expectedException) {
         }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 957062e..9867412 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.LongStream;
 
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
@@ -52,7 +51,6 @@ import 
org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
@@ -73,8 +71,6 @@ public class PubQueueCacheTest {
 
     private static final String TOPIC = "package_topic";
 
-    private static final String PUB_SLING_ID = 
"79fd948e-9435-4128-b42f-32327ba21df3";
-
     private static final String PUB_AGENT_NAME_1 = "pubAgentName1";
 
     private static final String PUB_AGENT_NAME_2 = "pubAgentName2";
@@ -110,8 +106,6 @@ public class PubQueueCacheTest {
     @Mock
     private Closeable poller;
 
-    private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
-
     private PubQueueCache cache;
 
     private ExecutorService executor;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
index f95c470..cd0dc5d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.impl.shared.LocalStore;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Test;
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
index 8e7adb9..8229ad3 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
@@ -22,8 +22,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.sling.distribution.journal.impl.shared.PackageRetries;
-
 public class PackageRetriesTest {
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index ba955d2..5070b7e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,14 +21,14 @@ package 
org.apache.sling.distribution.journal.impl.subscriber;
 import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
 import static 
org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
 import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -47,16 +47,21 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.LocalStore;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
+import 
org.apache.sling.distribution.journal.impl.subscriber.BookKeeper.PackageStatus;
 import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
@@ -210,28 +215,46 @@ public class SubscriberTest {
     @After
     public void after() throws IOException {
         subscriber.deactivate();
-        verify(poller).close();
+        verify(poller, atLeastOnce()).close();
+    }
+    
+    @Test
+    public void testReceiveNotSubscribed() throws DistributionException {
+        assumeNoPrecondition();
+        initSubscriber(ImmutableMap.of("agentNames", "dummy"));
+        assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
+        
+        MessageInfo info = new TestMessageInfo("", 1, 100, 0);
+        PackageMessage message = BASIC_ADD_PACKAGE;
+        
+        packageHandler.handle(info, message);
+        verify(packageBuilder, 
timeout(1000).times(0)).installPackage(Mockito.any(ResourceResolver.class), 
+                Mockito.any(ByteArrayInputStream.class));
+        assertThat(getStoredOffset(), nullValue());
+        for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) {
+            packageHandler.handle(info, message);
+        }
+        assertThat(getStoredOffset(), equalTo(100l));
     }
     
     @Test
     public void testReceive() throws DistributionException {
         assumeNoPrecondition();
         initSubscriber();
-
         assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
         
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-
         PackageMessage message = BASIC_ADD_PACKAGE;
-
         final Semaphore sem = new Semaphore(0);
         
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), 
                 Mockito.any(ByteArrayInputStream.class))
                 ).thenAnswer(new WaitFor(sem));
+        
         packageHandler.handle(info, message);
         
         waitSubscriber(RUNNING);
         sem.release();
+        
         waitSubscriber(IDLE);
         verify(statusSender, times(0)).accept(anyObject());
     }
@@ -241,23 +264,21 @@ public class SubscriberTest {
         assumeNoPrecondition();
         initSubscriber();
 
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            ResourceUtil.getOrCreateResource(resolver, "/test","sling:Folder", 
"sling:Folder", true);
-        }
+        createResource("/test");
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-
         PackageMessage message = BASIC_DEL_PACKAGE;
         final Semaphore sem = new Semaphore(0);
         when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
                 Mockito.any(ByteArrayInputStream.class))
         ).thenAnswer(new WaitFor(sem));
+        
         packageHandler.handle(info, message);
+        
         waitSubscriber(RUNNING);
         sem.release();
+        
         waitSubscriber(IDLE);
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            assertThat(resolver.getResource("/test"), nullValue());
-        }
+        assertThat(getResource("/test"), nullValue());
     }
 
     @Test
@@ -267,53 +288,57 @@ public class SubscriberTest {
 
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
         PackageMessage message = BASIC_ADD_PACKAGE;
-
         when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
                 Mockito.any(ByteArrayInputStream.class))
         ).thenThrow(new RuntimeException("Expected"));
 
         packageHandler.handle(info, message);
+        
         verify(statusSender, timeout(10000).times(1)).accept(anyObject());
     }
 
     @Test
     public void testSendSuccessStatus() throws DistributionException, 
InterruptedException {
         assumeNoPrecondition();
+        // Only editable subscriber will send status
         initSubscriber(ImmutableMap.of("editable", "true"));
 
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
         PackageMessage message = BASIC_ADD_PACKAGE;
 
         packageHandler.handle(info, message);
+        
         waitSubscriber(IDLE);
-
         verify(statusSender, timeout(10000).times(1)).accept(anyObject());
     }
 
     @Test
-    public void testSkipOnRemovedStatus() throws DistributionException, 
InterruptedException, TimeoutException {
-        assumeNoPrecondition();
-        initSubscriber();
+    public void testSkipBecauseOfPrecondition() throws DistributionException, 
InterruptedException, TimeoutException {
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
anyLong())).thenReturn(Decision.SKIP);
+        initSubscriber(ImmutableMap.of("editable", "true"));
         MessageInfo info = new TestMessageInfo("", 1, 11, 0);
         PackageMessage message = BASIC_ADD_PACKAGE;
 
         packageHandler.handle(info, message);
-        waitSubscriber(RUNNING);
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), 
anyInt())).thenReturn(false);
-
-        try {
-            waitSubscriber(IDLE);
-            fail("Cannot be IDLE without a validation status");
-        } catch (Throwable t) {
-
-        }
-
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), 
anyInt())).thenReturn(true);
-        waitSubscriber(IDLE);
-
+        
+        await().until(this::getStatus, 
equalTo(PackageStatusMessage.Status.REMOVED));
+        verify(statusSender, timeout(10000).times(1)).accept(anyObject());
     }
     
     @Test
+    public void testPreconditionTimeoutExceptionBecauseOfShutdown() throws 
DistributionException, InterruptedException, TimeoutException, IOException {
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
anyLong())).thenReturn(Decision.WAIT);
+        initSubscriber(ImmutableMap.of("editable", "true"));
+        MessageInfo info = new TestMessageInfo("", 1, 11, 0);
+        PackageMessage message = BASIC_ADD_PACKAGE;
+
+        long startedAt = System.currentTimeMillis();
+        packageHandler.handle(info, message);
+        subscriber.deactivate();
+        assertThat("After deactivate precondition should time out quickly.", 
System.currentTimeMillis() - startedAt, lessThan(1000l));
+    }
+
+    @Test
     public void testReadyWhenWatingForPrecondition() {
         Semaphore sem = new Semaphore(0);
         assumeWaitingForPrecondition(sem);
@@ -326,6 +351,28 @@ public class SubscriberTest {
         await("Should report ready").until(() -> 
subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
         sem.release();
     }
+    
+    private Long getStoredOffset() {
+        LocalStore store = new LocalStore(resolverFactory, 
BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
+        return store.load(BookKeeper.KEY_OFFSET, Long.class);
+    }
+
+    private Status getStatus() {
+        LocalStore statusStore = new LocalStore(resolverFactory, 
BookKeeper.STORE_TYPE_STATUS, SUB1_AGENT_NAME);
+        return new PackageStatus(statusStore.load()).status;
+    }
+
+    private void createResource(String path) throws PersistenceException, 
LoginException {
+        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            ResourceUtil.getOrCreateResource(resolver, path,"sling:Folder", 
"sling:Folder", true);
+        }
+    }
+
+    private Resource getResource(String path) throws LoginException {
+        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            return resolver.getResource(path);
+        }
+    }
 
     private void initSubscriber() {
         initSubscriber(Collections.emptyMap());
@@ -348,7 +395,7 @@ public class SubscriberTest {
     private void waitSubscriber(DistributionAgentState expectedState) {
         await().until(subscriber::getState, equalTo(expectedState));
     }
-
+    
     private void mockMetrics() {
         Histogram histogram = Mockito.mock(Histogram.class);
         Counter counter = Mockito.mock(Counter.class);
@@ -379,7 +426,7 @@ public class SubscriberTest {
 
     private void assumeNoPrecondition() {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), 
anyInt())).thenReturn(true);
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
anyLong())).thenReturn(Decision.ACCEPT);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -387,8 +434,8 @@ public class SubscriberTest {
 
     private void assumeWaitingForPrecondition(Semaphore sem) {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), 
anyInt()))
-                .thenAnswer(invocation -> sem.tryAcquire(10000, 
TimeUnit.SECONDS));
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong()))
+                .thenAnswer(invocation -> sem.tryAcquire(10000, 
TimeUnit.SECONDS) ? Decision.ACCEPT : Decision.SKIP);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

Reply via email to