This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9472-2 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 2d5e7c7688e3d1c1aac80aea40946bd8cebc0f11 Author: Christian Schneider <[email protected]> AuthorDate: Fri Jun 26 16:47:05 2020 +0200 SLING-9472 - Make Precondition return immediately --- .../impl/precondition/DefaultPrecondition.java | 4 +- .../journal/impl/precondition/Precondition.java | 5 +- .../impl/precondition/StagingPrecondition.java | 48 ++------------------ .../impl/subscriber/DistributionSubscriber.java | 30 ++++++++++-- .../PreConditionTimeoutException.java} | 15 +++--- .../impl/precondition/DefaultPreconditionTest.java | 6 +-- .../precondition/PackageStatusWatcherTest.java | 2 +- .../impl/precondition/StagingPreconditionTest.java | 53 ++++++---------------- .../journal/impl/queue/impl/PubQueueCacheTest.java | 6 --- .../journal/impl/shared/LocalStoreTest.java | 1 - .../journal/impl/shared/PackageRetriesTest.java | 2 - .../journal/impl/subscriber/SubscriberTest.java | 24 ++++++---- 12 files changed, 76 insertions(+), 120 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java index dceed02..5144378 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java @@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component; @Component(immediate = true, service = Precondition.class, property = { "name=default" }) public class DefaultPrecondition implements Precondition { @Override - public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) { - return true; + public Decision canProcess(String subAgentName, long pkgOffset) { + return Decision.ACCEPT; } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java index 69da7b4..d934431 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java @@ -30,9 +30,10 @@ public interface Precondition { * @param pkgOffset the offset of the package * @param timeoutSeconds max seconds to wait until returning * @throws TimeoutException if the timeout expired without being able to determine status - * @throws IllegalStateException if the precondition can't be evaluated + * @throws InterruptedException if the thread was interrupted and should shut down * @return true if the package can be processed; otherwise it returns false. */ - boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException; + Decision canProcess(String subAgentName, long pkgOffset); + enum Decision { ACCEPT, SKIP, WAIT}; } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java index 9b55848..be27fe2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java @@ -21,8 +21,6 @@ package org.apache.sling.distribution.journal.impl.precondition; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD; -import java.util.concurrent.TimeoutException; - import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.shared.Topics; @@ -48,8 +46,6 @@ public class StagingPrecondition implements Precondition, Runnable { private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class); - private static final long STATUS_CHECK_DELAY_MS = 100; - @Reference private MessagingProvider messagingProvider; @@ -58,8 +54,6 @@ public class StagingPrecondition implements Precondition, Runnable { private volatile PackageStatusWatcher watcher; - private volatile boolean running = true; - @Activate public void activate() { watcher = new PackageStatusWatcher(messagingProvider, topics); @@ -68,40 +62,16 @@ public class StagingPrecondition implements Precondition, Runnable { @Deactivate public synchronized void deactivate() { - - /* - * Note that we don't interrupt blocking calls using Thread.interrupt() - * because interrupts can stop the Apache Oak repository. - * - * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html - */ - IOUtils.closeQuietly(watcher); - running = false; } @Override - public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException { - if (timeoutSeconds < 1) { - throw new IllegalArgumentException(); - } - - // try to get the status for timeoutSeconds and then throw - for(int i = 0; running && i < timeoutSeconds * 10 ; i++) { - Status status = getStatus(subAgentName, pkgOffset); - if (status != null) { - return status == Status.IMPORTED; - } else { - delayStatusCheck(); - } + public Decision canProcess(String subAgentName, long pkgOffset) { + Status status = getStatus(subAgentName, pkgOffset); + if (status == null) { + return Decision.WAIT; } - - if (!running) { - throw new IllegalStateException("Staging precondition is shutting down"); - } - - throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic."); - + return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP; } private synchronized Status getStatus(String subAgentName, long pkgOffset) { @@ -114,12 +84,4 @@ public class StagingPrecondition implements Precondition, Runnable { watcher = new PackageStatusWatcher(messagingProvider, topics); } - private static void delayStatusCheck() { - try { - Thread.sleep(STATUS_CHECK_DELAY_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 9a5625b..8047df8 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -57,6 +57,7 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.precondition.Precondition; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.PackageMessage; @@ -299,7 +300,7 @@ public class DistributionSubscriber { } try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) { - processQueueItem(item.get().getInfo(), item.get().getMessage()); + processQueueItem(item.get()); } finally { subscriberIdle.ifPresent(SubscriberIdle::idle); } @@ -344,9 +345,10 @@ public class DistributionSubscriber { return Optional.empty(); } - private void processQueueItem(MessageInfo info, PackageMessage queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException { + private void processQueueItem(FullMessage<PackageMessage> item) throws PersistenceException, LoginException, DistributionException, TimeoutException { + MessageInfo info = item.getInfo(); + PackageMessage pkgMsg = item.getMessage(); long offset = info.getOffset(); - PackageMessage pkgMsg = queueItem; boolean skip = shouldSkip(offset); subscriberIdle.ifPresent(SubscriberIdle::busy); if (skip) { @@ -360,7 +362,9 @@ public class DistributionSubscriber { } private boolean shouldSkip(long offset) throws TimeoutException { - return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT); + boolean cleared = commandPoller.isCleared(offset); + Decision decision = waitPrecondition(offset); + return cleared || decision == Decision.SKIP; } private static void delay(long delayInMs) { @@ -371,4 +375,22 @@ public class DistributionSubscriber { } } + private Decision waitPrecondition(long offset) { + Decision decision = Precondition.Decision.WAIT; + long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT * 1000; + while (decision == Decision.WAIT && System.currentTimeMillis() < endTime) { + decision = precondition.canProcess(subAgentName, offset); + if (decision == Decision.WAIT) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return Decision.SKIP; + } + } else { + return decision; + } + } + throw new PreConditionTimeoutException("Timeout waiting for package offset " + offset + " on status topic."); + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java similarity index 67% copy from src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java copy to src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java index dceed02..684ab9d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.precondition; +package org.apache.sling.distribution.journal.impl.subscriber; -import org.osgi.service.component.annotations.Component; - -@Component(immediate = true, service = Precondition.class, property = { "name=default" }) -public class DefaultPrecondition implements Precondition { - @Override - public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) { - return true; +public class PreConditionTimeoutException extends RuntimeException { + public PreConditionTimeoutException(String msg) { + super(msg); } + + private static final long serialVersionUID = 6286011641627241560L; + } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java index f279265..c44bf13 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java @@ -21,13 +21,13 @@ package org.apache.sling.distribution.journal.impl.precondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; import org.junit.Test; public class DefaultPreconditionTest { @Test public void testAlwaysTrue() { - boolean canProcess = new DefaultPrecondition().canProcess("any", 100, 10); - assertThat(canProcess, equalTo(true)); + Decision decision = new DefaultPrecondition().canProcess("any", 100); + assertThat(decision, equalTo(Decision.ACCEPT)); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java index f882f32..bf771ac 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java @@ -89,7 +89,7 @@ public class PackageStatusWatcherTest { void generateMessages(int begin, int end) { MessageHandler<PackageStatusMessage> handler = adapterCaptor.getValue().getHandler(); for (int i=begin; i<end; i++) { - handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0L), + handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l), createStatusMessage(i)); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java index 3a31224..f64d24d 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java @@ -19,22 +19,18 @@ package org.apache.sling.distribution.journal.impl.precondition; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.Closeable; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; @@ -56,7 +52,7 @@ public class StagingPreconditionTest { private static final String SUB1_SLING_ID = "sub1sling"; private static final String GP_SUB1_AGENT_NAME = "gpsub1agent"; private static final String PUB1_AGENT_NAME = "pub1agent"; - private static final Long OFFSET_NOT_PRESENT = 111111L; + private static final Long OFFSET_NOT_PRESENT = 111111l; @Mock private MessagingProvider clientProvider; @@ -88,47 +84,28 @@ public class StagingPreconditionTest { statusHandler = statusCaptor.getValue().getHandler(); } - @Test(expected = IllegalArgumentException.class) - public void testIllegalTimeout() throws InterruptedException, TimeoutException { - precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1); - } - - @Test(expected = TimeoutException.class) + @Test public void testNotYetProcessed() throws InterruptedException, TimeoutException { simulateMessage(OTHER_AGENT, 1002, PackageStatusMessage.Status.IMPORTED); - boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT, 1); - assertThat(res, equalTo(true)); + Decision res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT); + assertThat(res, equalTo(Decision.WAIT)); + + Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT); + assertThat(res2, equalTo(Decision.WAIT)); - // We got no package for this agent. So this should time out - precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1); } @Test - public void testDeactivateDuringCanProcess() { - AtomicReference<Throwable> exHolder = new AtomicReference<>(); - Thread th = new Thread(() -> { - try { - precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 2); - } catch (Throwable t) { - exHolder.set(t); - } - }); - th.start(); - precondition.deactivate(); - Throwable ex = Awaitility.await().until(exHolder::get, notNullValue()); - assertThat(ex, instanceOf(IllegalStateException.class)); - } - - @Test(expected = TimeoutException.class) public void testCleanup() throws InterruptedException, TimeoutException { simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED); - assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1)); + Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002); + assertThat(res, equalTo(Decision.ACCEPT)); // Cleanup precondition.run(); - // Should time out because after cleanup message is not present anymore - precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1); + Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002); + assertThat(res2, equalTo(Decision.WAIT)); } @Test @@ -137,9 +114,9 @@ public class StagingPreconditionTest { simulateMessage(GP_SUB1_AGENT_NAME, 1001, PackageStatusMessage.Status.REMOVED); simulateMessage(GP_SUB1_AGENT_NAME, 1002, PackageStatusMessage.Status.IMPORTED); - assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1)); - assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1)); - assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1)); + assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000), equalTo(Decision.SKIP)); + assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001), equalTo(Decision.SKIP)); + assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002), equalTo(Decision.ACCEPT)); } private void simulateMessage(String subAgentName, long pkgOffset, PackageStatusMessage.Status status) { diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index 957062e..9867412 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -39,7 +39,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.stream.LongStream; -import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.metrics.Counter; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; @@ -52,7 +51,6 @@ import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; @@ -73,8 +71,6 @@ public class PubQueueCacheTest { private static final String TOPIC = "package_topic"; - private static final String PUB_SLING_ID = "79fd948e-9435-4128-b42f-32327ba21df3"; - private static final String PUB_AGENT_NAME_1 = "pubAgentName1"; private static final String PUB_AGENT_NAME_2 = "pubAgentName2"; @@ -110,8 +106,6 @@ public class PubQueueCacheTest { @Mock private Closeable poller; - private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); - private PubQueueCache cache; private ExecutorService executor; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java index f95c470..cd0dc5d 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java @@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolver; -import org.apache.sling.distribution.journal.impl.shared.LocalStore; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Test; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java index 8e7adb9..8229ad3 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java @@ -22,8 +22,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; -import org.apache.sling.distribution.journal.impl.shared.PackageRetries; - public class PackageRetriesTest { @Test diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index d21c41a..87cba08 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -22,10 +22,8 @@ import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE; import static org.apache.sling.distribution.agent.DistributionAgentState.RUNNING; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.eq; @@ -47,6 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.sling.distribution.journal.impl.precondition.Precondition; +import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; @@ -57,6 +56,7 @@ import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.Resource; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.api.resource.ResourceUtil; @@ -251,9 +251,7 @@ public class SubscriberTest { packageHandler.handle(info, message); waitSubscriber(RUNNING); waitSubscriber(IDLE); - try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { - assertThat(resolver.getResource("/test"), nullValue()); - } + getResource("/test"); } @Test @@ -295,7 +293,7 @@ public class SubscriberTest { packageHandler.handle(info, message); waitSubscriber(RUNNING); - when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(false); + when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.SKIP); try { waitSubscriber(IDLE); @@ -304,7 +302,7 @@ public class SubscriberTest { } - when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11), anyInt())).thenReturn(true); + when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11))).thenReturn(Decision.ACCEPT); waitSubscriber(IDLE); } @@ -323,6 +321,12 @@ public class SubscriberTest { sem.release(); } + private Resource getResource(String path) throws LoginException { + try (ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null)) { + return resolver.getResource(path); + } + } + private void initSubscriber() { initSubscriber(Collections.emptyMap()); } @@ -375,7 +379,7 @@ public class SubscriberTest { private void assumeNoPrecondition() { try { - when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())).thenReturn(true); + when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())).thenReturn(Decision.ACCEPT); } catch (Exception e) { throw new RuntimeException(e); } @@ -383,8 +387,8 @@ public class SubscriberTest { private void assumeWaitingForPrecondition(Semaphore sem) { try { - when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(), anyInt())) - .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS)); + when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong())) + .thenAnswer(invocation -> sem.tryAcquire(10000, TimeUnit.SECONDS) ? Decision.ACCEPT : Decision.SKIP); } catch (Exception e) { throw new RuntimeException(e); }
