This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new c291dd2 SLING-9162 - Timeout when failing to fetch the queue state
c291dd2 is described below
commit c291dd2ebbfd01a90b86ba8cb23aa95b4a180323
Author: tmaret <[email protected]>
AuthorDate: Thu Mar 26 15:12:03 2020 +0100
SLING-9162 - Timeout when failing to fetch the queue state
---
.../distribution/journal/impl/queue/impl/PubQueueCache.java | 13 ++++++++++---
.../journal/impl/queue/impl/PubQueueCacheService.java | 3 +++
.../journal/impl/queue/impl/PubQueueProviderImpl.java | 1 +
3 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 7eb3451..189513c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,6 +19,7 @@
package org.apache.sling.distribution.journal.impl.queue.impl;
+import static java.lang.System.currentTimeMillis;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -76,6 +77,8 @@ public class PubQueueCache {
private static final Logger LOG =
LoggerFactory.getLogger(PubQueueCache.class);
+ private static final long MAX_FETCH_WAIT_MS = 60 * 1000; // 1 minute
+
/**
* (pubAgentName x OffsetQueue)
*/
@@ -226,7 +229,10 @@ public class PubQueueCache {
// a head poller will block until the head poller is
// available. The headPollerLock guarantees to not
// run head pollers concurrently.
- headPollerLock.lock();
+ boolean locked = headPollerLock.tryLock(MAX_FETCH_WAIT_MS,
MILLISECONDS);
+ if (! locked) {
+ throw new RuntimeException("Gave up fetching queue state");
+ }
try {
// Once the headPollerLock has been acquired,
@@ -259,14 +265,15 @@ public class PubQueueCache {
}
private void waitSeeded() throws InterruptedException {
- while (!closed) {
+ long start = currentTimeMillis();
+ while (!closed && currentTimeMillis() - start < MAX_FETCH_WAIT_MS) {
if (seeded.await(seedingDelayMs, MILLISECONDS)) {
return;
} else {
LOG.debug("Waiting for seeded cache");
}
}
- throw new InterruptedException("Cache is closed");
+ throw new RuntimeException("Gave up waiting for seeded cache");
}
protected long getMinOffset() {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 46a2d00..c5eeb8c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
@@ -104,6 +106,7 @@ public class PubQueueCacheService implements Runnable {
LOG.info("Stopped Publisher queue cache service");
}
+ @Nonnull
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String
pubAgentName, long minOffset) {
try {
return cache.getOffsetQueue(pubAgentName, minOffset);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 3b7feab..cc9e839 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;