This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 697c9dd SLING-10112 - Do not wait for command poller to be idle (#86)
697c9dd is described below
commit 697c9dd7ba83c5bf10e6a148aba971db1e8e7832
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jul 15 15:51:15 2021 +0200
SLING-10112 - Do not wait for command poller to be idle (#86)
* SLING-10112 - Do not wait for command poller to be idle
* SLING-10112 - Fix import
---
.../journal/impl/subscriber/CommandPoller.java | 9 +--------
.../impl/subscriber/DistributionSubscriber.java | 10 ++--------
.../journal/impl/subscriber/SubscriberIdle.java | 4 ----
.../journal/impl/subscriber/CommandPollerTest.java | 2 +-
.../journal/impl/subscriber/SubscriberIdleTest.java | 6 ++++--
.../journal/impl/subscriber/SubscriberTest.java | 19 -------------------
6 files changed, 8 insertions(+), 42 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 22734a6..fcdb2b6 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -38,14 +38,12 @@ public class CommandPoller implements Closeable {
private final String subSlingId;
private final String subAgentName;
private final Closeable poller;
- private final IdleCheck idleCheck;
private final AtomicLong clearOffset = new AtomicLong(-1);
private final Runnable callback;
- public CommandPoller(MessagingProvider messagingProvider, Topics topics,
String subSlingId, String subAgentName, int idleMillies, Runnable callback) {
+ public CommandPoller(MessagingProvider messagingProvider, Topics topics,
String subSlingId, String subAgentName, Runnable callback) {
this.subSlingId = subSlingId;
this.subAgentName = subAgentName;
- this.idleCheck = new SubscriberIdle(idleMillies,
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
this.callback = callback;
this.poller = messagingProvider.createPoller(
topics.getCommandTopic(),
@@ -59,14 +57,12 @@ public class CommandPoller implements Closeable {
}
private void handleCommandMessage(MessageInfo info, ClearCommand command) {
- idleCheck.busy(0);
if (!subSlingId.equals(command.getSubSlingId()) ||
!subAgentName.equals(command.getSubAgentName())) {
LOG.debug("Skip command for subSlingId {}",
command.getSubSlingId());
return;
}
handleClearCommand(command);
- idleCheck.idle();
callback.run();
}
@@ -85,7 +81,4 @@ public class CommandPoller implements Closeable {
IOUtils.closeQuietly(poller);
}
- public boolean isIdle() {
- return idleCheck.isIdle();
- }
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4d7d841..9e676d9 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
@@ -95,7 +94,6 @@ public class DistributionSubscriber {
static long RETRY_DELAY = SECONDS.toMillis(5);
static long MAX_RETRY_DELAY = MINUTES.toMillis(15);
static long QUEUE_FETCH_DELAY = SECONDS.toMillis(1);
- private static final long COMMAND_NOT_IDLE_DELAY =
MILLISECONDS.toMillis(200);
private static final Supplier<LongSupplier> catchAllDelays = () ->
exponential(RETRY_DELAY, MAX_RETRY_DELAY);
private static final Logger LOG =
LoggerFactory.getLogger(DistributionSubscriber.class);
@@ -171,7 +169,7 @@ public class DistributionSubscriber {
Integer idleMillies = (Integer) properties.getOrDefault("idleMillies",
SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
if (config.editable()) {
- commandPoller = new CommandPoller(messagingProvider, topics,
subSlingId, subAgentName, idleMillies, delay::signal);
+ commandPoller = new CommandPoller(messagingProvider, topics,
subSlingId, subAgentName, delay::signal);
}
if (config.subscriberIdleCheck()) {
@@ -304,11 +302,7 @@ public class DistributionSubscriber {
LOG.info("Started Queue processor");
while (running) {
try {
- if (commandPoller == null || commandPoller.isIdle()) {
- fetchAndProcessQueueItem();
- } else {
- delay.await(COMMAND_NOT_IDLE_DELAY);
- }
+ fetchAndProcessQueueItem();
} catch (PreConditionTimeoutException e) {
// Precondition timed out. We only log this on info level as
it is no error
LOG.info(e.getMessage());
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index afcd824..417b97d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -40,10 +40,6 @@ public class SubscriberIdle implements IdleCheck {
private final ScheduledExecutorService executor;
private ScheduledFuture<?> schedule;
- public SubscriberIdle(int idleMillis, int forceIdleMillies) {
- this(idleMillis, forceIdleMillies, new AtomicBoolean());
- }
-
public SubscriberIdle(int idleMillis, int forceIdleMillies, AtomicBoolean
readyHolder) {
this.idleMillis = idleMillis;
this.isReady = readyHolder;
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index ae6b310..bb26102 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -157,7 +157,7 @@ public class CommandPollerTest {
Mockito.eq(Reset.earliest),
handlerCaptor.capture()))
.thenReturn(poller);
- commandPoller = new CommandPoller(clientProvider, topics,
SUB_SLING_ID, SUB_AGENT_NAME, 1000, callback);
+ commandPoller = new CommandPoller(clientProvider, topics,
SUB_SLING_ID, SUB_AGENT_NAME, callback);
commandHandler = handlerCaptor.getValue().getHandler();
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index 471c119..2035bfe 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -22,6 +22,8 @@ import static
org.apache.sling.distribution.journal.impl.subscriber.SubscriberId
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +35,7 @@ public class SubscriberIdleTest {
@Before
public void before() {
- idle = new SubscriberIdle(IDLE_MILLIES,
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+ idle = new SubscriberIdle(IDLE_MILLIES,
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, new AtomicBoolean());
}
@After
@@ -76,7 +78,7 @@ public class SubscriberIdleTest {
@Test
public void testStartIdle() throws InterruptedException {
- idle = new SubscriberIdle(IDLE_MILLIES,
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS);
+ idle = new SubscriberIdle(IDLE_MILLIES,
SubscriberIdle.DEFAULT_FORCE_IDLE_MILLIS, new AtomicBoolean());
assertThat("Initial state", idle.isIdle(), equalTo(false));
Thread.sleep(IDLE_MILLIES * 2);
assertThat("State after time over idle limit", idle.isIdle(),
equalTo(true));
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 5a191d4..045b47a 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -354,25 +354,6 @@ public class SubscriberTest {
verifyNoStatusMessageSent();
}
- /**
- * We must make sure that a delete command in the queue is honored if
possible.
- */
- @Test
- public void testReceiveNotProcessedWhenDeleteCommandLate() throws
Exception {
- assumeNoPrecondition();
- initSubscriber(ImmutableMap.of("editable", "true"));
-
- MessageInfo info = createInfo(0l);
- PackageMessage message = BASIC_ADD_PACKAGE;
- packageHandler.handle(info, message);
-
- Thread.sleep(500);
- ClearCommand clearCommand =
ClearCommand.builder().offset(10).subAgentName(SUB1_AGENT_NAME).subSlingId(SUB1_SLING_ID).build();
- commandHandler.handle(info, clearCommand);
-
- verifyStatusMessageSentWithStatus(Status.REMOVED);
- }
-
@Test
public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();