This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9472-2
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 2d5e7c7688e3d1c1aac80aea40946bd8cebc0f11
Author: Christian Schneider <[email protected]>
AuthorDate: Fri Jun 26 16:47:05 2020 +0200

    SLING-9472 - Make Precondition return immediately
---
 .../impl/precondition/DefaultPrecondition.java     |  4 +-
 .../journal/impl/precondition/Precondition.java    |  5 +-
 .../impl/precondition/StagingPrecondition.java     | 48 ++------------------
 .../impl/subscriber/DistributionSubscriber.java    | 30 ++++++++++--
 .../PreConditionTimeoutException.java}             | 15 +++---
 .../impl/precondition/DefaultPreconditionTest.java |  6 +--
 .../precondition/PackageStatusWatcherTest.java     |  2 +-
 .../impl/precondition/StagingPreconditionTest.java | 53 ++++++----------------
 .../journal/impl/queue/impl/PubQueueCacheTest.java |  6 ---
 .../journal/impl/shared/LocalStoreTest.java        |  1 -
 .../journal/impl/shared/PackageRetriesTest.java    |  2 -
 .../journal/impl/subscriber/SubscriberTest.java    | 24 ++++++----
 12 files changed, 76 insertions(+), 120 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index dceed02..5144378 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
 @Component(immediate = true, service = Precondition.class, property = { 
"name=default" })
 public class DefaultPrecondition implements Precondition {
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) {
-        return true;
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        return Decision.ACCEPT;
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 69da7b4..d934431 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,10 @@ public interface Precondition {
      * @param pkgOffset the offset of the package
      * @param timeoutSeconds max seconds to wait until returning
      * @throws TimeoutException if the timeout expired without being able to 
determine status
-     * @throws IllegalStateException if the precondition can't be evaluated
+     * @throws InterruptedException if the thread was interrupted and should 
shut down
      * @return true if the package can be processed; otherwise it returns 
false.
      */
-    boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) throws TimeoutException;
+    Decision canProcess(String subAgentName, long pkgOffset);
 
+    enum Decision { ACCEPT, SKIP, WAIT};
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 9b55848..be27fe2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,8 +21,6 @@ package 
org.apache.sling.distribution.journal.impl.precondition;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
 import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
 
-import java.util.concurrent.TimeoutException;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -48,8 +46,6 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StagingPrecondition.class);
 
-    private static final long STATUS_CHECK_DELAY_MS = 100;
-
     @Reference
     private MessagingProvider messagingProvider;
 
@@ -58,8 +54,6 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     private volatile PackageStatusWatcher watcher;
 
-    private volatile boolean running = true;
-    
     @Activate
     public void activate() {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -68,40 +62,16 @@ public class StagingPrecondition implements Precondition, 
Runnable {
 
     @Deactivate
     public synchronized void deactivate() {
-
-        /*
-         * Note that we don't interrupt blocking calls using Thread.interrupt()
-         * because interrupts can stop the Apache Oak repository.
-         *
-         * See SLING-9340, OAK-2609 and 
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
-         */
-
         IOUtils.closeQuietly(watcher);
-        running = false;
     }
 
     @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) throws TimeoutException {
-        if (timeoutSeconds < 1) {
-            throw new IllegalArgumentException();
-        }
-
-        // try to get the status for timeoutSeconds and then throw
-        for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
-            Status status = getStatus(subAgentName, pkgOffset);
-            if (status != null) {
-                return status == Status.IMPORTED;
-            } else {
-                delayStatusCheck();
-            }
+    public Decision canProcess(String subAgentName, long pkgOffset) {
+        Status status = getStatus(subAgentName, pkgOffset);
+        if (status == null) {
+            return Decision.WAIT;
         }
-
-        if (!running) {
-            throw new IllegalStateException("Staging precondition is shutting 
down");
-        }
-
-        throw new TimeoutException("Timeout waiting for package offset " + 
pkgOffset + " on status topic.");
-
+        return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
     }
 
     private synchronized Status getStatus(String subAgentName, long pkgOffset) 
{
@@ -114,12 +84,4 @@ public class StagingPrecondition implements Precondition, 
Runnable {
         watcher = new PackageStatusWatcher(messagingProvider, topics);
     }
 
-    private static void delayStatusCheck() {
-        try {
-            Thread.sleep(STATUS_CHECK_DELAY_MS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9a5625b..8047df8 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -57,6 +57,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -299,7 +300,7 @@ public class DistributionSubscriber {
             }
 
             try (Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time()) {
-                processQueueItem(item.get().getInfo(), 
item.get().getMessage());
+                processQueueItem(item.get());
             } finally {
                 subscriberIdle.ifPresent(SubscriberIdle::idle);
             }
@@ -344,9 +345,10 @@ public class DistributionSubscriber {
         return Optional.empty();
     }
 
-    private void processQueueItem(MessageInfo info, PackageMessage queueItem) 
throws PersistenceException, LoginException, DistributionException, 
TimeoutException {
+    private void processQueueItem(FullMessage<PackageMessage> item) throws 
PersistenceException, LoginException, DistributionException, TimeoutException {
+        MessageInfo info = item.getInfo();
+        PackageMessage pkgMsg = item.getMessage();
         long offset = info.getOffset();
-        PackageMessage pkgMsg = queueItem;
         boolean skip = shouldSkip(offset);
         subscriberIdle.ifPresent(SubscriberIdle::busy);
         if (skip) {
@@ -360,7 +362,9 @@ public class DistributionSubscriber {
     }
 
     private boolean shouldSkip(long offset) throws TimeoutException {
-        return commandPoller.isCleared(offset) || 
!precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
+        boolean cleared = commandPoller.isCleared(offset);
+        Decision decision = waitPrecondition(offset);
+        return cleared || decision == Decision.SKIP;
     }
 
     private static void delay(long delayInMs) {
@@ -371,4 +375,22 @@ public class DistributionSubscriber {
         }
     }
 
+    private Decision waitPrecondition(long offset) {
+        Decision decision = Precondition.Decision.WAIT;
+        long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 
1000;
+        while (decision == Decision.WAIT && System.currentTimeMillis() < 
endTime) {
+            decision = precondition.canProcess(subAgentName, offset);
+            if (decision == Decision.WAIT) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    return Decision.SKIP;
+                }
+            } else {
+                return decision;
+            }
+        }
+        throw new PreConditionTimeoutException("Timeout waiting for package 
offset " + offset + " on status topic.");
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
similarity index 67%
copy from 
src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
copy to 
src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
index dceed02..684ab9d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.precondition;
+package org.apache.sling.distribution.journal.impl.subscriber;
 
-import org.osgi.service.component.annotations.Component;
-
-@Component(immediate = true, service = Precondition.class, property = { 
"name=default" })
-public class DefaultPrecondition implements Precondition {
-    @Override
-    public boolean canProcess(String subAgentName, long pkgOffset, int 
timeoutSeconds) {
-        return true;
+public class PreConditionTimeoutException extends RuntimeException {
+    public PreConditionTimeoutException(String msg) {
+        super(msg);
     }
+
+    private static final long serialVersionUID = 6286011641627241560L;
+
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index f279265..c44bf13 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -21,13 +21,13 @@ package 
org.apache.sling.distribution.journal.impl.precondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
-import 
org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.junit.Test;
 
 public class DefaultPreconditionTest {
     @Test
     public void testAlwaysTrue() {
-        boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 
10);
-        assertThat(canProcess, equalTo(true));
+        Decision decision = new DefaultPrecondition().canProcess("any", 100);
+        assertThat(decision, equalTo(Decision.ACCEPT));
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index f882f32..bf771ac 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -89,7 +89,7 @@ public class PackageStatusWatcherTest {
     void generateMessages(int begin, int end) {
         MessageHandler<PackageStatusMessage> handler = 
adapterCaptor.getValue().getHandler();
         for (int i=begin; i<end; i++) {
-            handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0L),
+            handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l),
                     createStatusMessage(i));
         }
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 3a31224..f64d24d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,22 +19,18 @@
 package org.apache.sling.distribution.journal.impl.precondition;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -56,7 +52,7 @@ public class StagingPreconditionTest {
     private static final String SUB1_SLING_ID = "sub1sling";
     private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
     private static final String PUB1_AGENT_NAME = "pub1agent";
-    private static final Long OFFSET_NOT_PRESENT = 111111L;
+    private static final Long OFFSET_NOT_PRESENT = 111111l;
 
     @Mock
     private MessagingProvider clientProvider;
@@ -88,47 +84,28 @@ public class StagingPreconditionTest {
         statusHandler = statusCaptor.getValue().getHandler();
     }
     
-    @Test(expected = IllegalArgumentException.class)
-    public void testIllegalTimeout() throws InterruptedException, 
TimeoutException {
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
-    }
-    
-    @Test(expected = TimeoutException.class)
+    @Test
     public void testNotYetProcessed() throws InterruptedException, 
TimeoutException {
         simulateMessage(OTHER_AGENT, 1002, 
PackageStatusMessage.Status.IMPORTED);
-        boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 
1);
-        assertThat(res, equalTo(true));
+        Decision res = precondition.canProcess(OTHER_AGENT, 
OFFSET_NOT_PRESENT);
+        assertThat(res, equalTo(Decision.WAIT));
+
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 
OFFSET_NOT_PRESENT);
+        assertThat(res2, equalTo(Decision.WAIT));
 
-        // We got no package for this agent. So this should time out
-        precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
     }
     
     @Test
-    public void testDeactivateDuringCanProcess() {
-        AtomicReference<Throwable> exHolder = new AtomicReference<>();
-        Thread th = new Thread(() -> {
-            try {
-                precondition.canProcess(GP_SUB1_AGENT_NAME, 
OFFSET_NOT_PRESENT, 2);
-            } catch (Throwable t) {
-                exHolder.set(t);
-            }
-        });
-        th.start();
-        precondition.deactivate();
-        Throwable ex = Awaitility.await().until(exHolder::get, notNullValue());
-        assertThat(ex, instanceOf(IllegalStateException.class));
-    }
-    
-    @Test(expected = TimeoutException.class)
     public void testCleanup() throws InterruptedException, TimeoutException {
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, 
PackageStatusMessage.Status.IMPORTED);
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res, equalTo(Decision.ACCEPT));
         
         // Cleanup
         precondition.run();
         
-        // Should time out because after cleanup message is not present anymore
-        precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+        Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+        assertThat(res2, equalTo(Decision.WAIT));
     }
     
     @Test
@@ -137,9 +114,9 @@ public class StagingPreconditionTest {
         simulateMessage(GP_SUB1_AGENT_NAME, 1001, 
PackageStatusMessage.Status.REMOVED);
         simulateMessage(GP_SUB1_AGENT_NAME, 1002, 
PackageStatusMessage.Status.IMPORTED);
 
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
-        assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
-        assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), 
equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), 
equalTo(Decision.SKIP));
+        assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), 
equalTo(Decision.ACCEPT));
     }
 
     private void simulateMessage(String subAgentName, long pkgOffset, 
PackageStatusMessage.Status status) {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 957062e..9867412 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.LongStream;
 
-import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
@@ -52,7 +51,6 @@ import 
org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
@@ -73,8 +71,6 @@ public class PubQueueCacheTest {
 
     private static final String TOPIC = "package_topic";
 
-    private static final String PUB_SLING_ID = 
"79fd948e-9435-4128-b42f-32327ba21df3";
-
     private static final String PUB_AGENT_NAME_1 = "pubAgentName1";
 
     private static final String PUB_AGENT_NAME_2 = "pubAgentName2";
@@ -110,8 +106,6 @@ public class PubQueueCacheTest {
     @Mock
     private Closeable poller;
 
-    private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
-
     private PubQueueCache cache;
 
     private ExecutorService executor;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
index f95c470..cd0dc5d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.impl.shared.LocalStore;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Test;
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
index 8e7adb9..8229ad3 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
@@ -22,8 +22,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.sling.distribution.journal.impl.shared.PackageRetries;
-
 public class PackageRetriesTest {
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index d21c41a..87cba08 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -22,10 +22,8 @@ import static 
org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
 import static 
org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -47,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -57,6 +56,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.api.resource.ResourceUtil;
@@ -251,9 +251,7 @@ public class SubscriberTest {
         packageHandler.handle(info, message);
         waitSubscriber(RUNNING);
         waitSubscriber(IDLE);
-        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
-            assertThat(resolver.getResource("/test"), nullValue());
-        }
+        getResource("/test");
     }
 
     @Test
@@ -295,7 +293,7 @@ public class SubscriberTest {
 
         packageHandler.handle(info, message);
         waitSubscriber(RUNNING);
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), 
anyInt())).thenReturn(false);
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
eq(11))).thenReturn(Decision.SKIP);
 
         try {
             waitSubscriber(IDLE);
@@ -304,7 +302,7 @@ public class SubscriberTest {
 
         }
 
-        when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), 
anyInt())).thenReturn(true);
+        when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
eq(11))).thenReturn(Decision.ACCEPT);
         waitSubscriber(IDLE);
 
     }
@@ -323,6 +321,12 @@ public class SubscriberTest {
         sem.release();
     }
 
+    private Resource getResource(String path) throws LoginException {
+        try (ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            return resolver.getResource(path);
+        }
+    }
+
     private void initSubscriber() {
         initSubscriber(Collections.emptyMap());
     }
@@ -375,7 +379,7 @@ public class SubscriberTest {
 
     private void assumeNoPrecondition() {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), 
anyInt())).thenReturn(true);
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), 
anyLong())).thenReturn(Decision.ACCEPT);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -383,8 +387,8 @@ public class SubscriberTest {
 
     private void assumeWaitingForPrecondition(Semaphore sem) {
         try {
-            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), 
anyInt()))
-                .thenAnswer(invocation -> sem.tryAcquire(10000, 
TimeUnit.SECONDS));
+            when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong()))
+                .thenAnswer(invocation -> sem.tryAcquire(10000, 
TimeUnit.SECONDS) ? Decision.ACCEPT : Decision.SKIP);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

Reply via email to