This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d53f87e92859ea7ee8ac7787a886ea46ab20524e Author: tmaret <[email protected]> AuthorDate: Tue Apr 7 22:19:59 2020 +0200 SLING-9340 - keep blocking code in the DistributionSubscriber class --- .../journal/impl/subscriber/BookKeeper.java | 30 +++++++++++----------- .../impl/subscriber/DistributionSubscriber.java | 18 ++++++++++++- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java index 53a7245..1c12690 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java @@ -246,30 +246,22 @@ public class BookKeeper implements Closeable { } /** - * Send status stored in a previous run if exists - * @throws InterruptedException + * @return {@code true} if the status has been sent ; + * {@code false} otherwise. */ - public void sendStoredStatus() throws InterruptedException { - try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) { - PackageStatus status = new PackageStatus(statusStore.load()); - boolean sent = status.sent; - int retry = 0; - while (!sent) { - sent = sendStoredStatusOnce(status, retry++); - } - } catch (IOException e) { - log.warn("Error in timer close", e); - } + public boolean sendStoredStatus(int retry) { + PackageStatus status = new PackageStatus(statusStore.load()); + return status.sent || sendStoredStatus(status, retry); } - private boolean sendStoredStatusOnce(PackageStatus status, int retry) throws InterruptedException { + private boolean sendStoredStatus(PackageStatus status, int retry) { try { sendStatusMessage(status); markStatusSent(); return true; } catch (Exception e) { log.warn("Cannot send status (retry {})", retry, e); - Thread.sleep(RETRY_SEND_DELAY); + retryDelay(); return false; } } @@ -340,6 +332,14 @@ public class BookKeeper implements Closeable { return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService)); } + static void retryDelay() { + try { + Thread.sleep(RETRY_SEND_DELAY); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + class PackageStatus { final Status status; final Long offset; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 74fc5e2..8228424 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -29,6 +29,7 @@ import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory. import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP; import java.io.Closeable; +import java.io.IOException; import java.util.Collections; import java.util.Dictionary; import java.util.Hashtable; @@ -365,7 +366,7 @@ public class DistributionSubscriber implements DistributionAgent { private void fetchAndProcessQueueItem() throws InterruptedException { try { - bookKeeper.sendStoredStatus(); + boolean sent = blockingSendStoredStatus(); DistributionQueueItem item = blockingPeekQueueItem(); try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) { @@ -389,6 +390,21 @@ public class DistributionSubscriber implements DistributionAgent { } } + /** + * Send status stored in a previous run if exists + * + * @return {@code true} if the status has been sent ; + * {@code false} otherwise. + */ + private boolean blockingSendStoredStatus() { + try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) { + for (int retry = 0 ; running && ! bookKeeper.sendStoredStatus(retry) ; retry++); + } catch (IOException e) { + LOG.warn("Error in timer close", e); + } + return running; + } + private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException { while (true) { DistributionQueueItem queueItem = queueItemsBuffer.peek();
