This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 33d609d0a321e5e7e4f9a6758413437d4d4291ea Author: tmaret <[email protected]> AuthorDate: Tue Apr 7 23:24:23 2020 +0200 SLING-9340 - Don't interrupt seeder thread --- .../journal/impl/queue/impl/PubQueueCache.java | 33 ++++++++++------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index 189513c..84e28c3 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -124,8 +124,6 @@ public class PubQueueCache { */ private volatile boolean closed; - private final Thread seeder; - public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) { this.messagingProvider = messagingProvider; this.eventAdmin = eventAdmin; @@ -138,7 +136,7 @@ public class PubQueueCache { Reset.latest, create(PackageMessage.class, this::handlePackage)); - seeder = RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding"); + RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding"); } @Nonnull @@ -153,7 +151,6 @@ public class PubQueueCache { public void close() { closed = true; - seeder.interrupt(); IOUtils.closeQuietly(tailPoller); jmxRegs.stream().forEach(IOUtils::closeQuietly); } @@ -162,27 +159,28 @@ public class PubQueueCache { LOG.info("Start message seeder"); try { MessageSender<PackageMessage> sender = messagingProvider.createSender(); - sendSeedingMessages(sender); + do { + sendSeedingMessage(sender); + } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } finally { LOG.info("Stop message seeder"); } } - private void sendSeedingMessages(MessageSender<PackageMessage> sender) { - while (! Thread.interrupted()) { - PackageMessage pkgMsg = createTestMessage(); - LOG.info("Send seeding message"); - try { - sender.send(topic, pkgMsg); - sleep(seedingDelayMs); - } catch (MessagingException e) { - LOG.warn(e.getMessage(), e); - sleep(seedingDelayMs * 10); - } + private void sendSeedingMessage(MessageSender<PackageMessage> sender) { + PackageMessage pkgMsg = createTestMessage(); + LOG.info("Send seeding message"); + try { + sender.send(topic, pkgMsg); + } catch (MessagingException e) { + LOG.warn(e.getMessage(), e); + delay(seedingDelayMs * 10); } } - private void sleep(long sleepMs) { + private static void delay(long sleepMs) { try { Thread.sleep(sleepMs); } catch (InterruptedException e) { @@ -338,6 +336,5 @@ public class PubQueueCache { LOG.info("Cache has been seeded"); } seeded.countDown(); - seeder.interrupt(); } }
