This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12177
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 1e2632416423f666e7578ce0f96015eb7537efc3
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Mon Dec 4 15:57:53 2023 +0100

    SLING-12177 - Only create statusPoller once
---
 .../journal/impl/publisher/DistributionPublisher.java      | 14 +-------------
 .../journal/impl/publisher/PubQueueProviderPublisher.java  | 14 ++++++++++++--
 .../journal/impl/publisher/DistributionPublisherTest.java  |  1 -
 3 files changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index da83c61..e9a74f2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -26,7 +26,6 @@ import static 
org.apache.sling.distribution.DistributionRequestType.*;
 import static 
org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
 import static 
org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
 
-import java.io.Closeable;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +40,6 @@ import 
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
 import 
org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
@@ -69,8 +67,6 @@ import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.HandlerAdapter;
 
 /**
  * A Publisher SCD agent which produces messages to be consumed by a {@code 
DistributionSubscriber} agent.
@@ -113,8 +109,6 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     private final JMXRegistration reg;
 
-    private final Closeable statusPoller;
-
     private final DistributionLogEventListener distributionLogEventListener;
 
     @Activate
@@ -167,19 +161,13 @@ public class DistributionPublisher implements 
DistributionAgent {
                 () -> 
discoveryService.getTopologyView().getSubscribedAgentIds().size()
         );
         
-        statusPoller = messagingProvider.createPoller(
-                topics.getStatusTopic(),
-                Reset.earliest,
-                HandlerAdapter.create(PackageStatusMessage.class, 
pubQueueProvider::handleStatus)
-                );
-        
         log.info("Started Publisher agent {} with packageBuilder {}, 
queuedTimeout {}",
                 pubAgentName, pkgType, queuedTimeout);
     }
 
     @Deactivate
     public void deactivate() {
-        IOUtils.closeQuietly(statusPoller, distributionLogEventListener, reg);
+        IOUtils.closeQuietly(distributionLogEventListener, reg);
         componentReg.unregister();
         String msg = format("Stopped Publisher agent %s with packageBuilder 
%s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
index 2e36cab..bc44823 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
@@ -18,14 +18,17 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
+import java.io.Closeable;
 import java.util.Hashtable;
 import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
 import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
@@ -64,6 +67,8 @@ public class PubQueueProviderPublisher {
 
     private ServiceRegistration<PubQueueProvider> reg;
 
+    private Closeable statusPoller;
+
     @Activate
     public void activate(BundleContext context) {
         Consumer<ClearCommand> commandSender = 
messagingProvider.createSender(topics.getCommandTopic());
@@ -74,12 +79,17 @@ public class PubQueueProviderPublisher {
                 discoveryService,
                 commandSender);
         this.pubQueueProvider = pubQueueProviderFactory.create(callback);
+        this.statusPoller = messagingProvider.createPoller(
+                topics.getStatusTopic(),
+                Reset.earliest,
+                HandlerAdapter.create(PackageStatusMessage.class, 
pubQueueProvider::handleStatus)
+                );
         reg = context.registerService(PubQueueProvider.class, 
this.pubQueueProvider, new Hashtable<>());
     }
     
     @Deactivate
     public void deactivate() {
-        IOUtils.closeQuietly(this.pubQueueProvider);
+        IOUtils.closeQuietly(this.statusPoller, this.pubQueueProvider);
         reg.unregister();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 8c9fb16..902819d 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -61,7 +61,6 @@ import 
org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import org.apache.sling.settings.SlingSettingsService;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
 import org.junit.After;
 import org.junit.Before;

Reply via email to