This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new c291dd2  SLING-9162 - Timeout when failing to fetch the queue state
c291dd2 is described below

commit c291dd2ebbfd01a90b86ba8cb23aa95b4a180323
Author: tmaret <[email protected]>
AuthorDate: Thu Mar 26 15:12:03 2020 +0100

    SLING-9162 - Timeout when failing to fetch the queue state
---
 .../distribution/journal/impl/queue/impl/PubQueueCache.java | 13 ++++++++++---
 .../journal/impl/queue/impl/PubQueueCacheService.java       |  3 +++
 .../journal/impl/queue/impl/PubQueueProviderImpl.java       |  1 +
 3 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 7eb3451..189513c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,6 +19,7 @@
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
 
+import static java.lang.System.currentTimeMillis;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -76,6 +77,8 @@ public class PubQueueCache {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PubQueueCache.class);
 
+    private static final long MAX_FETCH_WAIT_MS = 60 * 1000; // 1 minute
+
     /**
      * (pubAgentName x OffsetQueue)
      */
@@ -226,7 +229,10 @@ public class PubQueueCache {
             // a head poller will block until the head poller is
             // available. The headPollerLock guarantees to not
             // run head pollers concurrently.
-            headPollerLock.lock();
+            boolean locked = headPollerLock.tryLock(MAX_FETCH_WAIT_MS, 
MILLISECONDS);
+            if (! locked) {
+                throw new RuntimeException("Gave up fetching queue state");
+            }
             try {
 
                 // Once the headPollerLock has been acquired,
@@ -259,14 +265,15 @@ public class PubQueueCache {
     }
 
     private void waitSeeded() throws InterruptedException {
-        while (!closed) {
+        long start = currentTimeMillis();
+        while (!closed && currentTimeMillis() - start < MAX_FETCH_WAIT_MS) {
             if (seeded.await(seedingDelayMs, MILLISECONDS)) {
                 return;
             } else {
                 LOG.debug("Waiting for seeded cache");
             }
         }
-        throw new InterruptedException("Cache is closed");
+        throw new RuntimeException("Gave up waiting for seeded cache");
     }
 
     protected long getMinOffset() {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 46a2d00..c5eeb8c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
@@ -104,6 +106,7 @@ public class PubQueueCacheService implements Runnable {
         LOG.info("Stopped Publisher queue cache service");
     }
 
+    @Nonnull
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) {
         try {
             return cache.getOffsetQueue(pubAgentName, minOffset);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..cc9e839 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 

Reply via email to