This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 33e2ba1 SLING-8415 - throw upon catching interrupted exception
33e2ba1 is described below
commit 33e2ba12094d7aac3f6dacdc578bbd22b1b9abbd
Author: tmaret <[email protected]>
AuthorDate: Sat May 11 22:33:43 2019 +0200
SLING-8415 - throw upon catching interrupted exception
---
.../journal/impl/queue/impl/PubQueueCache.java | 62 ++++++++++------------
.../impl/queue/impl/PubQueueCacheService.java | 7 ++-
2 files changed, 33 insertions(+), 36 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 89b90ca..89a001f 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,6 +124,8 @@ public class PubQueueCache {
*/
private volatile boolean closed;
+ private Thread seeder;
+
public PubQueueCache(MessagingProvider messagingProvider, EventAdmin
eventAdmin, DistributionMetricsService distributionMetricsService, String
topic) {
this.messagingProvider = messagingProvider;
@@ -136,11 +138,11 @@ public class PubQueueCache {
Reset.latest,
create(PackageMessage.class, this::handlePackage));
- RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue
seeding");
+ seeder = RunnableUtil.startBackgroundThread(this::sendSeedingMessages,
"queue seeding");
}
@Nonnull
- public OffsetQueue<DistributionQueueItem> getOffsetQueue(String
pubAgentName, long minOffset) {
+ public OffsetQueue<DistributionQueueItem> getOffsetQueue(String
pubAgentName, long minOffset) throws InterruptedException {
fetchIfNeeded(minOffset);
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
@@ -151,29 +153,29 @@ public class PubQueueCache {
public void close() {
closed = true;
+ seeder.interrupt();
IOUtils.closeQuietly(tailPoller);
jmxRegs.stream().forEach(IOUtils::closeQuietly);
}
private void sendSeedingMessages() {
- LOG.info("Send seeding messages");
+ LOG.info("Start message seeder");
MessageSender<PackageMessage> sender =
messagingProvider.createSender();
- while (! closed) {
+ while (! Thread.interrupted()) {
PackageMessage pkgMsg = createTestMessage();
+ LOG.debug("Send seeding message");
try {
- LOG.debug("Send seeding message");
sender.send(topic, pkgMsg);
- if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
- LOG.info("Cache has been seeded");
- return;
- }
} catch (MessagingException e) {
- LOG.info(e.getMessage());
- sleep(1000);
- } catch (InterruptedException ignore) {
- // ignore
+ LOG.warn(e.getMessage(), e);
+ }
+ try {
+ Thread.sleep(SEEDING_DELAY_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ LOG.info("Stop message seeder");
}
private PackageMessage createTestMessage() {
@@ -192,7 +194,7 @@ public class PubQueueCache {
*
* @param requestedMinOffset the min offset to start fetching data from
*/
- private void fetchIfNeeded(long requestedMinOffset) {
+ private void fetchIfNeeded(long requestedMinOffset) throws
InterruptedException {
// We wait on the cache to be seeded (at least one message handled)
// before computing potential missing offsets.
@@ -227,8 +229,6 @@ public class PubQueueCache {
fetch(requestedMinOffset, cachedMinOffset);
}
- } catch (Exception e) {
- throw new RuntimeException(String.format("Failed to fetch
offsets [%s,%s[, %s", requestedMinOffset, cachedMinOffset, e.getMessage()), e);
} finally {
headPollerLock.unlock();
}
@@ -249,19 +249,15 @@ public class PubQueueCache {
updateMinOffset(requestedMinOffset);
}
- private void waitSeeded() {
- while (! closed) {
- try {
- if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
- return;
- } else {
- LOG.debug("Waiting for seeded cache");
- }
- } catch (InterruptedException ignore) {
- // ignore
+ private void waitSeeded() throws InterruptedException {
+ while (!closed) {
+ if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
+ return;
+ } else {
+ LOG.debug("Waiting for seeded cache");
}
}
- throw new RuntimeException();
+ throw new InterruptedException("Cache is closed");
}
protected long getMinOffset() {
@@ -320,16 +316,12 @@ public class PubQueueCache {
return agentQueue;
}
- private void sleep(long delay) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ignore) {
- // ignore
- }
- }
-
private void handlePackage(final MessageInfo info, final PackageMessage
message) {
merge(singletonList(new FullMessage<>(info, message)));
+ if (seeded.getCount() > 0) {
+ LOG.info("Cache has been seeded");
+ }
seeded.countDown();
+ seeder.interrupt();
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 2f06b49..5ba6d6b 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -94,7 +94,12 @@ public class PubQueueCacheService implements Runnable {
}
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String
pubAgentName, long minOffset) {
- return cache.getOffsetQueue(pubAgentName, minOffset);
+ try {
+ return cache.getOffsetQueue(pubAgentName, minOffset);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
private void cleanup() {