This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit a8c8aeff22cd9af1d5ba4e46109484f63a366f8b Author: tmaret <[email protected]> AuthorDate: Tue Apr 7 22:36:51 2020 +0200 SLING-9340 - stop processQueue using the running flag --- .../impl/subscriber/DistributionSubscriber.java | 42 +++++++++++++--------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 8228424..fb4fbcd 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -20,6 +20,7 @@ package org.apache.sling.distribution.journal.impl.subscriber; import static java.lang.String.format; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; import static org.apache.sling.distribution.journal.HandlerAdapter.create; @@ -109,6 +110,8 @@ public class DistributionSubscriber implements DistributionAgent { private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet(); + private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap()); + @Reference(name = "packageBuilder") private DistributionPackageBuilder packageBuilder; @@ -353,21 +356,23 @@ public class DistributionSubscriber implements DistributionAgent { private void processQueue() { LOG.info("Started Queue processor"); - while (!Thread.interrupted()) { - try { - fetchAndProcessQueueItem(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + while (running) { + fetchAndProcessQueueItem(); } LOG.info("Stopped Queue processor"); } - private void fetchAndProcessQueueItem() throws InterruptedException { + private void fetchAndProcessQueueItem() { try { - boolean sent = blockingSendStoredStatus(); + if (! blockingSendStoredStatus()) { + return; + } + DistributionQueueItem item = blockingPeekQueueItem(); + if (STOPPED_ITEM == item) { + return; + } try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) { processQueueItem(item); @@ -376,17 +381,13 @@ public class DistributionSubscriber implements DistributionAgent { } } catch (TimeoutException e) { - /** - * Precondition timed out. We only log this on info level as it is no error - */ + // Precondition timed out. We only log this on info level as it is no error LOG.info(e.getMessage()); - Thread.sleep(RETRY_DELAY); - } catch (InterruptedException e) { - throw e; + delay(RETRY_DELAY); } catch (Exception e) { // Catch all to prevent processing from stopping LOG.error("Error processing queue item", e); - Thread.sleep(RETRY_DELAY); + delay(RETRY_DELAY); } } @@ -406,7 +407,7 @@ public class DistributionSubscriber implements DistributionAgent { } private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException { - while (true) { + while (running) { DistributionQueueItem queueItem = queueItemsBuffer.peek(); if (queueItem != null) { return queueItem; @@ -414,6 +415,7 @@ public class DistributionSubscriber implements DistributionAgent { Thread.sleep(QUEUE_FETCH_DELAY); } } + return STOPPED_ITEM; } private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, InterruptedException, TimeoutException { @@ -435,4 +437,12 @@ public class DistributionSubscriber implements DistributionAgent { return commandPoller.isCleared(offset) || !precondition.canProcess(subAgentName, offset, PRECONDITION_TIMEOUT); } + private static void delay(long delayInMs) { + try { + Thread.sleep(delayInMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }
