This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-10112 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 79e74b9aaa204807ec82db07f571d79a40ec9fe7 Author: Christian Schneider <[email protected]> AuthorDate: Mon Feb 1 22:35:02 2021 +0100 SLING-10112 - Only start import when CommandPoller idle --- .../journal/impl/subscriber/CommandPoller.java | 11 +++- .../impl/subscriber/DistributionSubscriber.java | 13 +++-- .../journal/impl/subscriber/CommandPollerTest.java | 2 +- .../journal/impl/subscriber/SubscriberTest.java | 66 +++++++++++++++++++--- 4 files changed, 79 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java index e602fee..9bcef69 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java @@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.impl.subscriber; import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.IOUtils; @@ -38,11 +39,13 @@ public class CommandPoller implements Closeable { private final String subSlingId; private final String subAgentName; private final Closeable poller; + private final IdleCheck idleCheck; private final AtomicLong clearOffset = new AtomicLong(-1); - public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName) { + public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, int idleMillies) { this.subSlingId = subSlingId; this.subAgentName = subAgentName; + this.idleCheck = new SubscriberIdle(idleMillies, new AtomicBoolean()); this.poller = messagingProvider.createPoller( topics.getCommandTopic(), Reset.earliest, @@ -55,12 +58,14 @@ public class CommandPoller implements Closeable { } private void handleCommandMessage(MessageInfo info, ClearCommand message) { + idleCheck.busy(0); if (!subSlingId.equals(message.getSubSlingId()) || !subAgentName.equals(message.getSubAgentName())) { LOG.debug("Skip command for subSlingId {}", message.getSubSlingId()); return; } handleClearCommand(message.getOffset()); + idleCheck.idle(); } private void handleClearCommand(long offset) { @@ -77,4 +82,8 @@ public class CommandPoller implements Closeable { public void close() { IOUtils.closeQuietly(poller); } + + public boolean isIdle() { + return idleCheck.isIdle(); + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 67512b2..8b1baec 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -84,6 +84,7 @@ public class DistributionSubscriber { private static final int PRECONDITION_TIMEOUT = 60; static int RETRY_DELAY = 5000; static int QUEUE_FETCH_DELAY = 1000; + private static final long COMMAND_NOT_IDLE_DELAY_MS = 200; private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class); @@ -117,7 +118,7 @@ public class DistributionSubscriber { private volatile Closeable idleReadyCheck; //NOSONAR private volatile IdleCheck idleCheck; //NOSONAR - + private Closeable packagePoller; private volatile CommandPoller commandPoller; //NOSONAR @@ -152,13 +153,13 @@ public class DistributionSubscriber { requireNonNull(precondition); requireNonNull(bookKeeperFactory); + Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS); if (config.editable()) { - commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName); + commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, idleMillies); } if (config.subscriberIdleCheck()) { // Unofficial config (currently just for test) - Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS); AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName); idleCheck = new SubscriberIdle(idleMillies, readyHolder); @@ -275,7 +276,11 @@ public class DistributionSubscriber { LOG.info("Started Queue processor"); while (running) { try { - fetchAndProcessQueueItem(); + if (commandPoller == null || commandPoller.isIdle()) { + fetchAndProcessQueueItem(); + } else { + delay(COMMAND_NOT_IDLE_DELAY_MS); + } } catch (PreConditionTimeoutException e) { // Precondition timed out. We only log this on info level as it is no error LOG.info(e.getMessage()); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java index c4be949..4816f14 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java @@ -147,7 +147,7 @@ public class CommandPollerTest { Mockito.eq(Reset.earliest), handlerCaptor.capture())) .thenReturn(poller); - commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME); + commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, 1000); commandHandler = handlerCaptor.getValue().getHandler(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 66dac34..d920205 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -71,6 +71,7 @@ import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory; import org.apache.sling.distribution.journal.bookkeeper.LocalStore; import org.apache.sling.distribution.journal.impl.precondition.Precondition; import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; +import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.messages.DiscoveryMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; @@ -181,15 +182,25 @@ public class SubscriberTest { @Captor private ArgumentCaptor<HandlerAdapter<PackageMessage>> packageCaptor; + + @Captor + private ArgumentCaptor<HandlerAdapter<ClearCommand>> commandCaptor; + + @Captor + private ArgumentCaptor<PackageStatusMessage> statusMessageCaptor; @Mock private Closeable poller; @Mock + private Closeable commandPoller; + + @Mock private ServiceRegistration<DistributionAgent> reg; private MessageHandler<PackageMessage> packageHandler; - + + private MessageHandler<ClearCommand> commandHandler; @Before public void before() { @@ -208,11 +219,18 @@ public class SubscriberTest { when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender); when(clientProvider.createPoller( - Mockito.anyString(), + Mockito.eq(topics.getPackageTopic()), Mockito.eq(Reset.earliest), Mockito.anyString(), packageCaptor.capture())) .thenReturn(poller); + + when(clientProvider.createPoller( + Mockito.eq(topics.getCommandTopic()), + Mockito.eq(Reset.earliest), + commandCaptor.capture())) + .thenReturn(commandPoller); + when(context.registerService(Mockito.any(Class.class), eq(subscriber), Mockito.any(Dictionary.class))).thenReturn(reg); // you should call initSubscriber in each test method @@ -261,10 +279,10 @@ public class SubscriberTest { sem.release(); waitSubscriber(IDLE); - verify(statusSender, times(0)).accept(anyObject()); + verifyNoStatusMessageSent(); } - @Test + @Test public void testReceiveDelete() throws DistributionException, LoginException, PersistenceException { assumeNoPrecondition(); initSubscriber(); @@ -281,9 +299,29 @@ public class SubscriberTest { sem.release(); waitSubscriber(IDLE); + verifyNoStatusMessageSent(); assertThat(getResource("/test"), nullValue()); } + /** + * We must make sure that a delete command in the queue is honored if possible. + */ + @Test + public void testReceiveNotProcessedWhenDeleteCommandLate() throws Exception { + assumeNoPrecondition(); + initSubscriber(ImmutableMap.of("editable", "true")); + + MessageInfo info = createInfo(0l); + PackageMessage message = BASIC_ADD_PACKAGE; + packageHandler.handle(info, message); + + ClearCommand clearCommand = ClearCommand.builder().offset(10).subAgentName(SUB1_AGENT_NAME).subSlingId(SUB1_SLING_ID).build(); + Thread.sleep(500); + commandHandler.handle(info, clearCommand); + + verifyStatusMessageSentWithStatus(Status.REMOVED); + } + @Test public void testSendFailedStatus() throws DistributionException { assumeNoPrecondition(); @@ -295,7 +333,7 @@ public class SubscriberTest { PackageMessage message = BASIC_ADD_PACKAGE; packageHandler.handle(info, message); - verify(statusSender, timeout(10000).times(1)).accept(anyObject()); + verifyStatusMessageSentWithStatus(Status.REMOVED_FAILED); } @Test @@ -309,7 +347,7 @@ public class SubscriberTest { packageHandler.handle(info, message); waitSubscriber(IDLE); - verify(statusSender, timeout(10000).times(1)).accept(anyObject()); + verifyStatusMessageSentWithStatus(Status.IMPORTED); } @Test @@ -322,7 +360,7 @@ public class SubscriberTest { packageHandler.handle(info, message); await().until(this::getStatus, equalTo(PackageStatusMessage.Status.REMOVED)); - verify(statusSender, timeout(10000).times(1)).accept(anyObject()); + verifyStatusMessageSentWithStatus(Status.REMOVED); } @Test @@ -354,6 +392,17 @@ public class SubscriberTest { sem.release(); } + private void verifyNoStatusMessageSent() { + verify(statusSender, times(0)).accept(anyObject()); + } + + private PackageStatusMessage verifyStatusMessageSentWithStatus(Status expectedStatus) { + verify(statusSender, timeout(10000).times(1)).accept(statusMessageCaptor.capture()); + PackageStatusMessage statusMessage = statusMessageCaptor.getValue(); + assertThat(statusMessage.getStatus(), equalTo(expectedStatus)); + return statusMessage; + } + private OngoingStubbing<DistributionPackageInfo> whenInstallPackage() throws DistributionException { return when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class), Mockito.any(ByteArrayInputStream.class))); } @@ -401,6 +450,9 @@ public class SubscriberTest { subscriber.bookKeeperFactory = bookKeeperFactory; subscriber.activate(config, context, props); packageHandler = packageCaptor.getValue().getHandler(); + if ("true".equals(props.get("editable"))) { + commandHandler = commandCaptor.getValue().getHandler(); + } } private void waitSubscriber(DistributionAgentState expectedState) {
