This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-12177 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 1e2632416423f666e7578ce0f96015eb7537efc3 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Mon Dec 4 15:57:53 2023 +0100 SLING-12177 - Only create statusPoller once --- .../journal/impl/publisher/DistributionPublisher.java | 14 +------------- .../journal/impl/publisher/PubQueueProviderPublisher.java | 14 ++++++++++++-- .../journal/impl/publisher/DistributionPublisherTest.java | 1 - 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index da83c61..e9a74f2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -26,7 +26,6 @@ import static org.apache.sling.distribution.DistributionRequestType.*; import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed; import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank; -import java.io.Closeable; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -41,7 +40,6 @@ import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.shared.DefaultDistributionLog; import org.apache.sling.distribution.journal.shared.DistributionLogEventListener; @@ -69,8 +67,6 @@ import org.osgi.service.event.EventAdmin; import org.osgi.service.metatype.annotations.Designate; import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.HandlerAdapter; /** * A Publisher SCD agent which produces messages to be consumed by a {@code DistributionSubscriber} agent. @@ -113,8 +109,6 @@ public class DistributionPublisher implements DistributionAgent { private final JMXRegistration reg; - private final Closeable statusPoller; - private final DistributionLogEventListener distributionLogEventListener; @Activate @@ -167,19 +161,13 @@ public class DistributionPublisher implements DistributionAgent { () -> discoveryService.getTopologyView().getSubscribedAgentIds().size() ); - statusPoller = messagingProvider.createPoller( - topics.getStatusTopic(), - Reset.earliest, - HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus) - ); - log.info("Started Publisher agent {} with packageBuilder {}, queuedTimeout {}", pubAgentName, pkgType, queuedTimeout); } @Deactivate public void deactivate() { - IOUtils.closeQuietly(statusPoller, distributionLogEventListener, reg); + IOUtils.closeQuietly(distributionLogEventListener, reg); componentReg.unregister(); String msg = format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", pubAgentName, pkgType, queuedTimeout); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java index 2e36cab..bc44823 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java @@ -18,14 +18,17 @@ */ package org.apache.sling.distribution.journal.impl.publisher; +import java.io.Closeable; import java.util.Hashtable; import java.util.function.Consumer; import org.apache.commons.io.IOUtils; -import org.apache.sling.distribution.journal.JournalAvailable; +import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService; import org.apache.sling.distribution.journal.messages.ClearCommand; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.queue.CacheCallback; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory; @@ -64,6 +67,8 @@ public class PubQueueProviderPublisher { private ServiceRegistration<PubQueueProvider> reg; + private Closeable statusPoller; + @Activate public void activate(BundleContext context) { Consumer<ClearCommand> commandSender = messagingProvider.createSender(topics.getCommandTopic()); @@ -74,12 +79,17 @@ public class PubQueueProviderPublisher { discoveryService, commandSender); this.pubQueueProvider = pubQueueProviderFactory.create(callback); + this.statusPoller = messagingProvider.createPoller( + topics.getStatusTopic(), + Reset.earliest, + HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus) + ); reg = context.registerService(PubQueueProvider.class, this.pubQueueProvider, new Hashtable<>()); } @Deactivate public void deactivate() { - IOUtils.closeQuietly(this.pubQueueProvider); + IOUtils.closeQuietly(this.statusPoller, this.pubQueueProvider); reg.unregister(); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 8c9fb16..902819d 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -61,7 +61,6 @@ import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.queue.spi.DistributionQueue; -import org.apache.sling.settings.SlingSettingsService; import org.apache.sling.testing.mock.osgi.junit.OsgiContext; import org.junit.After; import org.junit.Before;