This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 33e2ba1  SLING-8415 - throw upon catching interrupted exception
33e2ba1 is described below

commit 33e2ba12094d7aac3f6dacdc578bbd22b1b9abbd
Author: tmaret <[email protected]>
AuthorDate: Sat May 11 22:33:43 2019 +0200

    SLING-8415 - throw upon catching interrupted exception
---
 .../journal/impl/queue/impl/PubQueueCache.java     | 62 ++++++++++------------
 .../impl/queue/impl/PubQueueCacheService.java      |  7 ++-
 2 files changed, 33 insertions(+), 36 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 89b90ca..89a001f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,6 +124,8 @@ public class PubQueueCache {
      */
     private volatile boolean closed;
 
+    private Thread seeder;
+
 
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic) {
         this.messagingProvider = messagingProvider;
@@ -136,11 +138,11 @@ public class PubQueueCache {
                 Reset.latest,
                 create(PackageMessage.class, this::handlePackage));
 
-        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue 
seeding");
+        seeder = RunnableUtil.startBackgroundThread(this::sendSeedingMessages, 
"queue seeding");
     }
 
     @Nonnull
-    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) {
+    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) throws InterruptedException {
         fetchIfNeeded(minOffset);
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
@@ -151,29 +153,29 @@ public class PubQueueCache {
 
     public void close() {
         closed = true;
+        seeder.interrupt();
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.stream().forEach(IOUtils::closeQuietly);
     }
     
     private void sendSeedingMessages() {
-        LOG.info("Send seeding messages");
+        LOG.info("Start message seeder");
         MessageSender<PackageMessage> sender = 
messagingProvider.createSender();
-        while (! closed) {
+        while (! Thread.interrupted()) {
             PackageMessage pkgMsg = createTestMessage();
+            LOG.debug("Send seeding message");
             try {
-                LOG.debug("Send seeding message");
                 sender.send(topic, pkgMsg);
-                if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
-                    LOG.info("Cache has been seeded");
-                    return;
-                }
             } catch (MessagingException e) {
-                LOG.info(e.getMessage());
-                sleep(1000);
-            } catch (InterruptedException ignore) {
-                // ignore
+                LOG.warn(e.getMessage(), e);
+            }
+            try {
+                Thread.sleep(SEEDING_DELAY_MS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
             }
         }
+        LOG.info("Stop message seeder");
     }
 
     private PackageMessage createTestMessage() {
@@ -192,7 +194,7 @@ public class PubQueueCache {
      *
      * @param requestedMinOffset the min offset to start fetching data from
      */
-    private void fetchIfNeeded(long requestedMinOffset) {
+    private void fetchIfNeeded(long requestedMinOffset) throws 
InterruptedException {
 
         // We wait on the cache to be seeded (at least one message handled)
         // before computing potential missing offsets.
@@ -227,8 +229,6 @@ public class PubQueueCache {
 
                     fetch(requestedMinOffset, cachedMinOffset);
                 }
-            } catch (Exception e) {
-                throw new RuntimeException(String.format("Failed to fetch 
offsets [%s,%s[, %s", requestedMinOffset, cachedMinOffset, e.getMessage()), e);
             } finally {
                 headPollerLock.unlock();
             }
@@ -249,19 +249,15 @@ public class PubQueueCache {
         updateMinOffset(requestedMinOffset);
     }
 
-    private void waitSeeded() {
-        while (! closed) {
-            try {
-                if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
-                    return;
-                } else {
-                    LOG.debug("Waiting for seeded cache");
-                }
-            } catch (InterruptedException ignore) {
-                // ignore
+    private void waitSeeded() throws InterruptedException {
+        while (!closed) {
+            if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
+                return;
+            } else {
+                LOG.debug("Waiting for seeded cache");
             }
         }
-        throw new RuntimeException();
+        throw new InterruptedException("Cache is closed");
     }
 
     protected long getMinOffset() {
@@ -320,16 +316,12 @@ public class PubQueueCache {
         return agentQueue;
     }
 
-    private void sleep(long delay) {
-        try {
-            Thread.sleep(delay);
-        } catch (InterruptedException ignore) {
-            // ignore
-        }
-    }
-
     private void handlePackage(final MessageInfo info, final PackageMessage 
message) {
         merge(singletonList(new FullMessage<>(info, message)));
+        if (seeded.getCount() > 0) {
+            LOG.info("Cache has been seeded");
+        }
         seeded.countDown();
+        seeder.interrupt();
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 2f06b49..5ba6d6b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -94,7 +94,12 @@ public class PubQueueCacheService implements Runnable {
     }
 
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) {
-        return cache.getOffsetQueue(pubAgentName, minOffset);
+        try {
+            return cache.getOffsetQueue(pubAgentName, minOffset);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
     }
 
     private void cleanup() {

Reply via email to