This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new a5f4bf6 SLING-9472 - Make Precondition return immediately (#45)
a5f4bf6 is described below
commit a5f4bf60ed6937c7aa28d9be52e453b59c7f8c6b
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 30 14:31:45 2020 +0200
SLING-9472 - Make Precondition return immediately (#45)
* SLING-9472 - Make Precondition return immediately
* SLING-9472 - Make test more robust
* SLING-9472 - Fix exception
* SLING-9472 - Add test case for precondition SKIP and small refactoring
* SLING-9472 - Test that status message ist sent
* SLING-9472 - Remove unused componentReg to avoid NPE
* SLING-9472 - Return wait in case of interrupt
* SLING-9472 - Add test for skipped packages
* SLING-9472 - throw PreConditionTimeoutException on shutdown
* SLING-9472 - reinterrupt
---
.../impl/precondition/DefaultPrecondition.java | 4 +-
.../journal/impl/precondition/Precondition.java | 5 +-
.../impl/precondition/StagingPrecondition.java | 48 +--------
.../journal/impl/subscriber/BookKeeper.java | 12 ++-
.../impl/subscriber/DistributionSubscriber.java | 46 +++++---
.../PreConditionTimeoutException.java} | 15 ++-
.../journal/impl/subscriber/SubscriberIdle.java | 4 +-
.../impl/precondition/DefaultPreconditionTest.java | 6 +-
.../precondition/PackageStatusWatcherTest.java | 2 +-
.../impl/precondition/StagingPreconditionTest.java | 53 +++------
.../impl/publisher/DistributionPublisherTest.java | 2 +-
.../journal/impl/queue/impl/PubQueueCacheTest.java | 6 --
.../journal/impl/shared/LocalStoreTest.java | 1 -
.../journal/impl/shared/PackageRetriesTest.java | 2 -
.../journal/impl/subscriber/SubscriberTest.java | 119 ++++++++++++++-------
15 files changed, 162 insertions(+), 163 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
index dceed02..5144378 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
@@ -23,7 +23,7 @@ import org.osgi.service.component.annotations.Component;
@Component(immediate = true, service = Precondition.class, property = {
"name=default" })
public class DefaultPrecondition implements Precondition {
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int
timeoutSeconds) {
- return true;
+ public Decision canProcess(String subAgentName, long pkgOffset) {
+ return Decision.ACCEPT;
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
index 69da7b4..e0a201c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java
@@ -30,9 +30,10 @@ public interface Precondition {
* @param pkgOffset the offset of the package
* @param timeoutSeconds max seconds to wait until returning
* @throws TimeoutException if the timeout expired without being able to
determine status
- * @throws IllegalStateException if the precondition can't be evaluated
+ * @throws InterruptedException if the thread was interrupted and should
shut down
* @return true if the package can be processed; otherwise it returns
false.
*/
- boolean canProcess(String subAgentName, long pkgOffset, int
timeoutSeconds) throws TimeoutException;
+ Decision canProcess(String subAgentName, long pkgOffset);
+ enum Decision { ACCEPT, SKIP, WAIT}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
index 9b55848..be27fe2 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java
@@ -21,8 +21,6 @@ package
org.apache.sling.distribution.journal.impl.precondition;
import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-import java.util.concurrent.TimeoutException;
-
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -48,8 +46,6 @@ public class StagingPrecondition implements Precondition,
Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(StagingPrecondition.class);
- private static final long STATUS_CHECK_DELAY_MS = 100;
-
@Reference
private MessagingProvider messagingProvider;
@@ -58,8 +54,6 @@ public class StagingPrecondition implements Precondition,
Runnable {
private volatile PackageStatusWatcher watcher;
- private volatile boolean running = true;
-
@Activate
public void activate() {
watcher = new PackageStatusWatcher(messagingProvider, topics);
@@ -68,40 +62,16 @@ public class StagingPrecondition implements Precondition,
Runnable {
@Deactivate
public synchronized void deactivate() {
-
- /*
- * Note that we don't interrupt blocking calls using Thread.interrupt()
- * because interrupts can stop the Apache Oak repository.
- *
- * See SLING-9340, OAK-2609 and
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
- */
-
IOUtils.closeQuietly(watcher);
- running = false;
}
@Override
- public boolean canProcess(String subAgentName, long pkgOffset, int
timeoutSeconds) throws TimeoutException {
- if (timeoutSeconds < 1) {
- throw new IllegalArgumentException();
- }
-
- // try to get the status for timeoutSeconds and then throw
- for(int i = 0; running && i < timeoutSeconds * 10 ; i++) {
- Status status = getStatus(subAgentName, pkgOffset);
- if (status != null) {
- return status == Status.IMPORTED;
- } else {
- delayStatusCheck();
- }
+ public Decision canProcess(String subAgentName, long pkgOffset) {
+ Status status = getStatus(subAgentName, pkgOffset);
+ if (status == null) {
+ return Decision.WAIT;
}
-
- if (!running) {
- throw new IllegalStateException("Staging precondition is shutting
down");
- }
-
- throw new TimeoutException("Timeout waiting for package offset " +
pkgOffset + " on status topic.");
-
+ return status == Status.IMPORTED ? Decision.ACCEPT : Decision.SKIP;
}
private synchronized Status getStatus(String subAgentName, long pkgOffset)
{
@@ -114,12 +84,4 @@ public class StagingPrecondition implements Precondition,
Runnable {
watcher = new PackageStatusWatcher(messagingProvider, topics);
}
- private static void delayStatusCheck() {
- try {
- Thread.sleep(STATUS_CHECK_DELAY_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 0541b41..18fe950 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -73,11 +73,13 @@ import org.slf4j.MDC;
* agent on the leader instance.
*/
public class BookKeeper implements Closeable {
- private static final String KEY_OFFSET = "offset";
+ static final String STORE_TYPE_PACKAGE = "packages";
+ static final String STORE_TYPE_STATUS = "statuses";
+ static final String KEY_OFFSET = "offset";
+ static final int COMMIT_AFTER_NUM_SKIPPED = 10;
private static final String SUBSERVICE_IMPORTER = "importer";
private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
private static final int RETRY_SEND_DELAY = 1000;
- private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ResourceResolverFactory resolverFactory;
@@ -120,8 +122,8 @@ public class BookKeeper implements Closeable {
// Error queues are enabled when the number
// of retry attempts is limited ; disabled otherwise
this.errorQueueEnabled = (maxRetries >= 0);
- this.statusStore = new LocalStore(resolverFactory, "statuses",
subAgentName);
- this.processedOffsets = new LocalStore(resolverFactory, "packages",
subAgentName);
+ this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS,
subAgentName);
+ this.processedOffsets = new LocalStore(resolverFactory,
STORE_TYPE_PACKAGE, subAgentName);
}
/**
@@ -281,7 +283,7 @@ public class BookKeeper implements Closeable {
}
public long loadOffset() {
- return processedOffsets.load(KEY_OFFSET, -1L);
+ return processedOffsets.load(KEY_OFFSET, -1L);
}
public int getRetries(String pubAgentName) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9a5625b..b739dcb 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -33,7 +33,6 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -48,7 +47,6 @@ import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.agent.DistributionAgentState;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
@@ -57,6 +55,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -64,7 +63,6 @@ import
org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -124,8 +122,6 @@ public class DistributionSubscriber {
private Optional<SubscriberIdle> subscriberIdle;
- private ServiceRegistration<DistributionAgent> componentReg;
-
private Closeable packagePoller;
private CommandPoller commandPoller;
@@ -145,6 +141,7 @@ public class DistributionSubscriber {
private String pkgType;
private volatile boolean running = true;
+ private Thread queueThread;
@Activate
public void activate(SubscriberConfiguration config, BundleContext
context, Map<String, Object> properties) {
@@ -188,7 +185,7 @@ public class DistributionSubscriber {
commandPoller = new CommandPoller(messagingProvider, topics,
subSlingId, subAgentName, editable);
- startBackgroundThread(this::processQueue,
+ queueThread = startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s",
subAgentName));
int announceDelay =
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
@@ -216,11 +213,16 @@ public class DistributionSubscriber {
* See SLING-9340, OAK-2609 and
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
*/
- componentReg.unregister();
IOUtils.closeQuietly(announcer, bookKeeper,
packagePoller, commandPoller);
subscriberIdle.ifPresent(IOUtils::closeQuietly);
running = false;
+ try {
+ queueThread.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Join interrupted");
+ }
String msg = String.format(
"Stopped Subscriber agent %s, subscribed to Publisher agent
names %s with package builder %s",
subAgentName, queueNames, pkgType);
@@ -299,12 +301,12 @@ public class DistributionSubscriber {
}
try (Timer.Context context =
distributionMetricsService.getProcessQueueItemDuration().time()) {
- processQueueItem(item.get().getInfo(),
item.get().getMessage());
+ processQueueItem(item.get());
} finally {
subscriberIdle.ifPresent(SubscriberIdle::idle);
}
- } catch (TimeoutException e) {
+ } catch (PreConditionTimeoutException e) {
// Precondition timed out. We only log this on info level as it is
no error
LOG.info(e.getMessage());
delay(RETRY_DELAY);
@@ -344,9 +346,10 @@ public class DistributionSubscriber {
return Optional.empty();
}
- private void processQueueItem(MessageInfo info, PackageMessage queueItem)
throws PersistenceException, LoginException, DistributionException,
TimeoutException {
+ private void processQueueItem(FullMessage<PackageMessage> item) throws
PersistenceException, LoginException, DistributionException {
+ MessageInfo info = item.getInfo();
+ PackageMessage pkgMsg = item.getMessage();
long offset = info.getOffset();
- PackageMessage pkgMsg = queueItem;
boolean skip = shouldSkip(offset);
subscriberIdle.ifPresent(SubscriberIdle::busy);
if (skip) {
@@ -359,8 +362,24 @@ public class DistributionSubscriber {
distributionMetricsService.getItemsBufferSize().decrement();
}
- private boolean shouldSkip(long offset) throws TimeoutException {
- return commandPoller.isCleared(offset) ||
!precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT);
+ private boolean shouldSkip(long offset) {
+ boolean cleared = commandPoller.isCleared(offset);
+ Decision decision = waitPrecondition(offset);
+ return cleared || decision == Decision.SKIP;
+ }
+
+ private Decision waitPrecondition(long offset) {
+ Decision decision = Precondition.Decision.WAIT;
+ long endTime = System.currentTimeMillis() + PRECONDITION_TIMEOUT *
1000;
+ while (decision == Decision.WAIT && System.currentTimeMillis() <
endTime && running) {
+ decision = precondition.canProcess(subAgentName, offset);
+ if (decision == Decision.WAIT) {
+ delay(100);
+ } else {
+ return decision;
+ }
+ }
+ throw new PreConditionTimeoutException("Timeout waiting for package
offset " + offset + " on status topic.");
}
private static void delay(long delayInMs) {
@@ -370,5 +389,4 @@ public class DistributionSubscriber {
Thread.currentThread().interrupt();
}
}
-
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
similarity index 67%
copy from
src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
copy to
src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
index dceed02..684ab9d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPrecondition.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/PreConditionTimeoutException.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal.impl.precondition;
+package org.apache.sling.distribution.journal.impl.subscriber;
-import org.osgi.service.component.annotations.Component;
-
-@Component(immediate = true, service = Precondition.class, property = {
"name=default" })
-public class DefaultPrecondition implements Precondition {
- @Override
- public boolean canProcess(String subAgentName, long pkgOffset, int
timeoutSeconds) {
- return true;
+public class PreConditionTimeoutException extends RuntimeException {
+ public PreConditionTimeoutException(String msg) {
+ super(msg);
}
+
+ private static final long serialVersionUID = 6286011641627241560L;
+
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index c7af419..3220a31 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -79,7 +79,9 @@ public class SubscriberIdle implements SystemReadyCheck,
Closeable {
public synchronized void idle() {
if (!isReady.get()) {
cancelSchedule();
- schedule = executor.schedule(this::ready, idleMillis,
TimeUnit.MILLISECONDS);
+ if (!executor.isShutdown()) {
+ schedule = executor.schedule(this::ready, idleMillis,
TimeUnit.MILLISECONDS);
+ }
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
index f279265..c44bf13 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/DefaultPreconditionTest.java
@@ -21,13 +21,13 @@ package
org.apache.sling.distribution.journal.impl.precondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import
org.apache.sling.distribution.journal.impl.precondition.DefaultPrecondition;
+import
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.junit.Test;
public class DefaultPreconditionTest {
@Test
public void testAlwaysTrue() {
- boolean canProcess = new DefaultPrecondition().canProcess("any", 100,
10);
- assertThat(canProcess, equalTo(true));
+ Decision decision = new DefaultPrecondition().canProcess("any", 100);
+ assertThat(decision, equalTo(Decision.ACCEPT));
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index f882f32..bf771ac 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -89,7 +89,7 @@ public class PackageStatusWatcherTest {
void generateMessages(int begin, int end) {
MessageHandler<PackageStatusMessage> handler =
adapterCaptor.getValue().getHandler();
for (int i=begin; i<end; i++) {
- handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0L),
+ handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l),
createStatusMessage(i));
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
index 3a31224..f64d24d 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java
@@ -19,22 +19,18 @@
package org.apache.sling.distribution.journal.impl.precondition;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -56,7 +52,7 @@ public class StagingPreconditionTest {
private static final String SUB1_SLING_ID = "sub1sling";
private static final String GP_SUB1_AGENT_NAME = "gpsub1agent";
private static final String PUB1_AGENT_NAME = "pub1agent";
- private static final Long OFFSET_NOT_PRESENT = 111111L;
+ private static final Long OFFSET_NOT_PRESENT = 111111l;
@Mock
private MessagingProvider clientProvider;
@@ -88,47 +84,28 @@ public class StagingPreconditionTest {
statusHandler = statusCaptor.getValue().getHandler();
}
- @Test(expected = IllegalArgumentException.class)
- public void testIllegalTimeout() throws InterruptedException,
TimeoutException {
- precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, -1);
- }
-
- @Test(expected = TimeoutException.class)
+ @Test
public void testNotYetProcessed() throws InterruptedException,
TimeoutException {
simulateMessage(OTHER_AGENT, 1002,
PackageStatusMessage.Status.IMPORTED);
- boolean res = precondition.canProcess(OTHER_AGENT, OFFSET_NOT_PRESENT,
1);
- assertThat(res, equalTo(true));
+ Decision res = precondition.canProcess(OTHER_AGENT,
OFFSET_NOT_PRESENT);
+ assertThat(res, equalTo(Decision.WAIT));
+
+ Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME,
OFFSET_NOT_PRESENT);
+ assertThat(res2, equalTo(Decision.WAIT));
- // We got no package for this agent. So this should time out
- precondition.canProcess(GP_SUB1_AGENT_NAME, OFFSET_NOT_PRESENT, 1);
}
@Test
- public void testDeactivateDuringCanProcess() {
- AtomicReference<Throwable> exHolder = new AtomicReference<>();
- Thread th = new Thread(() -> {
- try {
- precondition.canProcess(GP_SUB1_AGENT_NAME,
OFFSET_NOT_PRESENT, 2);
- } catch (Throwable t) {
- exHolder.set(t);
- }
- });
- th.start();
- precondition.deactivate();
- Throwable ex = Awaitility.await().until(exHolder::get, notNullValue());
- assertThat(ex, instanceOf(IllegalStateException.class));
- }
-
- @Test(expected = TimeoutException.class)
public void testCleanup() throws InterruptedException, TimeoutException {
simulateMessage(GP_SUB1_AGENT_NAME, 1002,
PackageStatusMessage.Status.IMPORTED);
- assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+ Decision res = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+ assertThat(res, equalTo(Decision.ACCEPT));
// Cleanup
precondition.run();
- // Should time out because after cleanup message is not present anymore
- precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1);
+ Decision res2 = precondition.canProcess(GP_SUB1_AGENT_NAME, 1002);
+ assertThat(res2, equalTo(Decision.WAIT));
}
@Test
@@ -137,9 +114,9 @@ public class StagingPreconditionTest {
simulateMessage(GP_SUB1_AGENT_NAME, 1001,
PackageStatusMessage.Status.REMOVED);
simulateMessage(GP_SUB1_AGENT_NAME, 1002,
PackageStatusMessage.Status.IMPORTED);
- assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000, 1));
- assertFalse(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001, 1));
- assertTrue(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002, 1));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1000),
equalTo(Decision.SKIP));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1001),
equalTo(Decision.SKIP));
+ assertThat(precondition.canProcess(GP_SUB1_AGENT_NAME, 1002),
equalTo(Decision.ACCEPT));
}
private void simulateMessage(String subAgentName, long pkgOffset,
PackageStatusMessage.Status status) {
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index fcaddae..78ddc6d 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -268,7 +268,7 @@ public class DistributionPublisherTest {
Counter counter = new TestCounter();
when(distributionMetricsService.getQueueAccessErrorCount()).thenReturn(counter);
try {
- DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
+ publisher.getQueue(QUEUE_NAME);
fail("Expected exception not thrown");
} catch (RuntimeException expectedException) {
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 957062e..9867412 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.LongStream;
-import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
@@ -52,7 +51,6 @@ import
org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
@@ -73,8 +71,6 @@ public class PubQueueCacheTest {
private static final String TOPIC = "package_topic";
- private static final String PUB_SLING_ID =
"79fd948e-9435-4128-b42f-32327ba21df3";
-
private static final String PUB_AGENT_NAME_1 = "pubAgentName1";
private static final String PUB_AGENT_NAME_2 = "pubAgentName2";
@@ -110,8 +106,6 @@ public class PubQueueCacheTest {
@Mock
private Closeable poller;
- private ResourceResolverFactory resolverFactory = new
MockResourceResolverFactory();
-
private PubQueueCache cache;
private ExecutorService executor;
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
index f95c470..cd0dc5d 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/LocalStoreTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.distribution.journal.impl.shared.LocalStore;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Test;
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
index 8e7adb9..8229ad3 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PackageRetriesTest.java
@@ -22,8 +22,6 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import org.apache.sling.distribution.journal.impl.shared.PackageRetries;
-
public class PackageRetriesTest {
@Test
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index ba955d2..5070b7e 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,14 +21,14 @@ package
org.apache.sling.distribution.journal.impl.subscriber;
import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
import static
org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -47,16 +47,21 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
+import
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.LocalStore;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
+import
org.apache.sling.distribution.journal.impl.subscriber.BookKeeper.PackageStatus;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ResourceUtil;
@@ -210,28 +215,46 @@ public class SubscriberTest {
@After
public void after() throws IOException {
subscriber.deactivate();
- verify(poller).close();
+ verify(poller, atLeastOnce()).close();
+ }
+
+ @Test
+ public void testReceiveNotSubscribed() throws DistributionException {
+ assumeNoPrecondition();
+ initSubscriber(ImmutableMap.of("agentNames", "dummy"));
+ assertThat(subscriber.getState(),
equalTo(DistributionAgentState.IDLE));
+
+ MessageInfo info = new TestMessageInfo("", 1, 100, 0);
+ PackageMessage message = BASIC_ADD_PACKAGE;
+
+ packageHandler.handle(info, message);
+ verify(packageBuilder,
timeout(1000).times(0)).installPackage(Mockito.any(ResourceResolver.class),
+ Mockito.any(ByteArrayInputStream.class));
+ assertThat(getStoredOffset(), nullValue());
+ for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) {
+ packageHandler.handle(info, message);
+ }
+ assertThat(getStoredOffset(), equalTo(100l));
}
@Test
public void testReceive() throws DistributionException {
assumeNoPrecondition();
initSubscriber();
-
assertThat(subscriber.getState(),
equalTo(DistributionAgentState.IDLE));
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-
PackageMessage message = BASIC_ADD_PACKAGE;
-
final Semaphore sem = new Semaphore(0);
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class))
).thenAnswer(new WaitFor(sem));
+
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
sem.release();
+
waitSubscriber(IDLE);
verify(statusSender, times(0)).accept(anyObject());
}
@@ -241,23 +264,21 @@ public class SubscriberTest {
assumeNoPrecondition();
initSubscriber();
- try (ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
- ResourceUtil.getOrCreateResource(resolver, "/test","sling:Folder",
"sling:Folder", true);
- }
+ createResource("/test");
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
-
PackageMessage message = BASIC_DEL_PACKAGE;
final Semaphore sem = new Semaphore(0);
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class))
).thenAnswer(new WaitFor(sem));
+
packageHandler.handle(info, message);
+
waitSubscriber(RUNNING);
sem.release();
+
waitSubscriber(IDLE);
- try (ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
- assertThat(resolver.getResource("/test"), nullValue());
- }
+ assertThat(getResource("/test"), nullValue());
}
@Test
@@ -267,53 +288,57 @@ public class SubscriberTest {
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
-
when(packageBuilder.installPackage(Mockito.any(ResourceResolver.class),
Mockito.any(ByteArrayInputStream.class))
).thenThrow(new RuntimeException("Expected"));
packageHandler.handle(info, message);
+
verify(statusSender, timeout(10000).times(1)).accept(anyObject());
}
@Test
public void testSendSuccessStatus() throws DistributionException,
InterruptedException {
assumeNoPrecondition();
+ // Only editable subscriber will send status
initSubscriber(ImmutableMap.of("editable", "true"));
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
+
waitSubscriber(IDLE);
-
verify(statusSender, timeout(10000).times(1)).accept(anyObject());
}
@Test
- public void testSkipOnRemovedStatus() throws DistributionException,
InterruptedException, TimeoutException {
- assumeNoPrecondition();
- initSubscriber();
+ public void testSkipBecauseOfPrecondition() throws DistributionException,
InterruptedException, TimeoutException {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME),
anyLong())).thenReturn(Decision.SKIP);
+ initSubscriber(ImmutableMap.of("editable", "true"));
MessageInfo info = new TestMessageInfo("", 1, 11, 0);
PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
- waitSubscriber(RUNNING);
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11),
anyInt())).thenReturn(false);
-
- try {
- waitSubscriber(IDLE);
- fail("Cannot be IDLE without a validation status");
- } catch (Throwable t) {
-
- }
-
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), eq(11),
anyInt())).thenReturn(true);
- waitSubscriber(IDLE);
-
+
+ await().until(this::getStatus,
equalTo(PackageStatusMessage.Status.REMOVED));
+ verify(statusSender, timeout(10000).times(1)).accept(anyObject());
}
@Test
+ public void testPreconditionTimeoutExceptionBecauseOfShutdown() throws
DistributionException, InterruptedException, TimeoutException, IOException {
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME),
anyLong())).thenReturn(Decision.WAIT);
+ initSubscriber(ImmutableMap.of("editable", "true"));
+ MessageInfo info = new TestMessageInfo("", 1, 11, 0);
+ PackageMessage message = BASIC_ADD_PACKAGE;
+
+ long startedAt = System.currentTimeMillis();
+ packageHandler.handle(info, message);
+ subscriber.deactivate();
+ assertThat("After deactivate precondition should time out quickly.",
System.currentTimeMillis() - startedAt, lessThan(1000l));
+ }
+
+ @Test
public void testReadyWhenWatingForPrecondition() {
Semaphore sem = new Semaphore(0);
assumeWaitingForPrecondition(sem);
@@ -326,6 +351,28 @@ public class SubscriberTest {
await("Should report ready").until(() ->
subscriberReadyStore.getReadyHolder(SUB1_AGENT_NAME).get());
sem.release();
}
+
+ private Long getStoredOffset() {
+ LocalStore store = new LocalStore(resolverFactory,
BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
+ return store.load(BookKeeper.KEY_OFFSET, Long.class);
+ }
+
+ private Status getStatus() {
+ LocalStore statusStore = new LocalStore(resolverFactory,
BookKeeper.STORE_TYPE_STATUS, SUB1_AGENT_NAME);
+ return new PackageStatus(statusStore.load()).status;
+ }
+
+ private void createResource(String path) throws PersistenceException,
LoginException {
+ try (ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
+ ResourceUtil.getOrCreateResource(resolver, path,"sling:Folder",
"sling:Folder", true);
+ }
+ }
+
+ private Resource getResource(String path) throws LoginException {
+ try (ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
+ return resolver.getResource(path);
+ }
+ }
private void initSubscriber() {
initSubscriber(Collections.emptyMap());
@@ -348,7 +395,7 @@ public class SubscriberTest {
private void waitSubscriber(DistributionAgentState expectedState) {
await().until(subscriber::getState, equalTo(expectedState));
}
-
+
private void mockMetrics() {
Histogram histogram = Mockito.mock(Histogram.class);
Counter counter = Mockito.mock(Counter.class);
@@ -379,7 +426,7 @@ public class SubscriberTest {
private void assumeNoPrecondition() {
try {
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(),
anyInt())).thenReturn(true);
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME),
anyLong())).thenReturn(Decision.ACCEPT);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -387,8 +434,8 @@ public class SubscriberTest {
private void assumeWaitingForPrecondition(Semaphore sem) {
try {
- when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong(),
anyInt()))
- .thenAnswer(invocation -> sem.tryAcquire(10000,
TimeUnit.SECONDS));
+ when(precondition.canProcess(eq(SUB1_AGENT_NAME), anyLong()))
+ .thenAnswer(invocation -> sem.tryAcquire(10000,
TimeUnit.SECONDS) ? Decision.ACCEPT : Decision.SKIP);
} catch (Exception e) {
throw new RuntimeException(e);
}