This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit b92195d39a0019dfd65e7abbc542b518b90e871e Author: tmaret <[email protected]> AuthorDate: Tue Apr 7 22:54:45 2020 +0200 SLING-9340 - Precondition to raise ISE instead of IE --- .../journal/impl/precondition/Precondition.java | 4 ++-- .../impl/precondition/StagingPrecondition.java | 26 +++++++++++++++------- .../impl/subscriber/DistributionSubscriber.java | 6 +++-- .../impl/precondition/StagingPreconditionTest.java | 3 +-- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java index 3730475..69da7b4 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/Precondition.java @@ -30,9 +30,9 @@ public interface Precondition { * @param pkgOffset the offset of the package * @param timeoutSeconds max seconds to wait until returning * @throws TimeoutException if the timeout expired without being able to determine status - * @throws InterruptedException if the thread was interrupted and should shut down + * @throws IllegalStateException if the precondition can't be evaluated * @return true if the package can be processed; otherwise it returns false. */ - boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException; + boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException; } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java index 2272888..c0e3d48 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/StagingPrecondition.java @@ -48,6 +48,8 @@ public class StagingPrecondition implements Precondition, Runnable { private static final Logger LOG = LoggerFactory.getLogger(StagingPrecondition.class); + private static final long STATUS_CHECK_DELAY_MS = 100; + @Reference private MessagingProvider messagingProvider; @@ -71,27 +73,27 @@ public class StagingPrecondition implements Precondition, Runnable { } @Override - public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws InterruptedException, TimeoutException { + public boolean canProcess(String subAgentName, long pkgOffset, int timeoutSeconds) throws TimeoutException { if (timeoutSeconds < 1) { throw new IllegalArgumentException(); } // try to get the status for timeoutSeconds and then throw - for(int i=0; i < timeoutSeconds * 10; i++) { + for(int i = 0; running && i < timeoutSeconds * 10 ; i++) { Status status = getStatus(subAgentName, pkgOffset); - if (status != null) { return status == Status.IMPORTED; } else { - Thread.sleep(100); - } - - if (!running) { - throw new InterruptedException("Staging precondition is shutting down"); + delayStatusCheck(); } } + if (!running) { + throw new IllegalStateException("Staging precondition is shutting down"); + } + throw new TimeoutException("Timeout waiting for package offset " + pkgOffset + " on status topic."); + } private synchronized Status getStatus(String subAgentName, long pkgOffset) { @@ -104,4 +106,12 @@ public class StagingPrecondition implements Precondition, Runnable { watcher = new PackageStatusWatcher(messagingProvider, topics); } + private static void delayStatusCheck() { + try { + Thread.sleep(STATUS_CHECK_DELAY_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index fb4fbcd..93f528b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -384,6 +384,8 @@ public class DistributionSubscriber implements DistributionAgent { // Precondition timed out. We only log this on info level as it is no error LOG.info(e.getMessage()); delay(RETRY_DELAY); + } catch (IllegalStateException e) { + throw e; } catch (Exception e) { // Catch all to prevent processing from stopping LOG.error("Error processing queue item", e); @@ -418,7 +420,7 @@ public class DistributionSubscriber implements DistributionAgent { return STOPPED_ITEM; } - private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException { + private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException { long offset = queueItem.get(RECORD_OFFSET, Long.class); PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class); boolean skip = shouldSkip(offset); @@ -433,7 +435,7 @@ public class DistributionSubscriber implements DistributionAgent { distributionMetricsService.getItemsBufferSize().decrement(); } - private boolean shouldSkip(long offset) throws InterruptedException, TimeoutException { + private boolean shouldSkip(long offset) throws TimeoutException { return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java index eab0479..6a9f893 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/StagingPreconditionTest.java @@ -35,7 +35,6 @@ import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.impl.precondition.StagingPrecondition; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.Messages.PackageStatusMessage; @@ -117,7 +116,7 @@ public class StagingPreconditionTest { th.start(); precondition.deactivate(); Throwable ex = Awaitility.await().until(() -> exHolder.get(), notNullValue()); - assertThat(ex, instanceOf(InterruptedException.class)); + assertThat(ex, instanceOf(IllegalStateException.class)); } @Test(expected = TimeoutException.class)
