This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 5c2aa91f7494af3378866f032176e2f1a7d64766
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Fri Apr 12 14:47:44 2024 +0200

    SLING-12292 - Split metrics into publisher and subscriber
---
 .../journal/bookkeeper/BookKeeper.java             |  55 +++---
 .../journal/bookkeeper/BookKeeperFactory.java      |   6 +-
 .../impl/publisher/DistributionPublisher.java      |  29 ++--
 .../impl/publisher/MessagingCacheCallback.java     |  10 +-
 .../impl/publisher/PubQueueProviderPublisher.java  |   6 +-
 .../impl/subscriber/DistributionSubscriber.java    |  14 +-
 .../journal/shared/PublishMetrics.java             | 188 +++++++++++++++++++++
 ...nMetricsService.java => SubscriberMetrics.java} | 103 ++---------
 .../journal/bookkeeper/BookKeeperTest.java         |   8 +-
 .../impl/publisher/DistributionPublisherTest.java  |  12 +-
 .../impl/publisher/MessagingCacheCallbackTest.java |   8 +-
 .../journal/impl/subscriber/SubscriberTest.java    |   4 +-
 ...ServiceTest.java => SubscriberMetricsTest.java} |  14 +-
 13 files changed, 274 insertions(+), 183 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 2bb1a84..2a2b023 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -49,7 +49,7 @@ import 
org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.SubscriberMetrics;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
 import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor;
 import org.osgi.service.event.Event;
@@ -88,7 +88,7 @@ public class BookKeeper {
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
-    private final DistributionMetricsService distributionMetricsService;
+    private final SubscriberMetrics subscriberMetrics;
     private final PackageHandler packageHandler;
     private final EventAdmin eventAdmin;
     private final Consumer<PackageStatusMessage> sender;
@@ -103,14 +103,7 @@ public class BookKeeper {
     private final InvalidationProcessor invalidationProcessor;
     private int skippedCounter = 0;
 
-    public BookKeeper(ResourceResolverFactory resolverFactory, 
DistributionMetricsService distributionMetricsService,
-        PackageHandler packageHandler, EventAdmin eventAdmin, 
Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
-        BookKeeperConfig config) {
-        this(resolverFactory, distributionMetricsService, packageHandler, 
eventAdmin, sender,
-            logSender, config, new NoOpImportPostProcessor(), new 
NoOpInvalidationProcessor());
-    }
-    
-    public BookKeeper(ResourceResolverFactory resolverFactory, 
DistributionMetricsService distributionMetricsService,
+    public BookKeeper(ResourceResolverFactory resolverFactory, 
SubscriberMetrics subscriberMetrics,
         PackageHandler packageHandler, EventAdmin eventAdmin, 
Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
         BookKeeperConfig config, ImportPostProcessor importPostProcessor, 
InvalidationProcessor invalidationProcessor) {
         this.packageHandler = packageHandler;
@@ -118,10 +111,10 @@ public class BookKeeper {
         this.sender = sender;
         this.logSender = logSender;
         this.config = config;
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + 
".current_retries;sub_name=" + config.getSubAgentName();
-        distributionMetricsService.createGauge(nameRetries, 
packageRetries::getSum);
+        
+        subscriberMetrics.currentRetries(config.getSubAgentName(), 
packageRetries::getSum);
         this.resolverFactory = resolverFactory;
-        this.distributionMetricsService = distributionMetricsService;
+        this.subscriberMetrics = subscriberMetrics;
         // Error queues are enabled when the number
         // of retry attempts is limited ; disabled otherwise
         this.errorQueueEnabled = (config.getMaxRetries() >= 0);
@@ -150,7 +143,7 @@ public class BookKeeper {
      */
     public void importPackage(PackageMessage pkgMsg, long offset, long 
createdTime) throws DistributionException {
         log.debug("Importing distribution package {} at offset={}", pkgMsg, 
offset);
-        try (Timer.Context context = 
distributionMetricsService.getImportedPackageDuration().time();
+        try (Timer.Context context = 
subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
             if (config.isEditable()) {
@@ -158,8 +151,8 @@ public class BookKeeper {
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
-            
distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            
distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis()
 - createdTime), TimeUnit.MILLISECONDS);
+            
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
+            
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - 
createdTime), TimeUnit.MILLISECONDS);
             
             // Execute the post-processor
             postProcess(pkgMsg);
@@ -169,7 +162,7 @@ public class BookKeeper {
             Event event = new AppliedEvent(pkgMsg, 
config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
             log.info("Imported distribution package {} at offset={}", pkgMsg, 
offset);
-            
distributionMetricsService.getPackageStatusCounter(Status.IMPORTED.name()).increment();
+            
subscriberMetrics.getPackageStatusCounter(Status.IMPORTED.name()).increment();
         } catch (DistributionException | LoginException | IOException | 
RuntimeException | ImportPostProcessException e) {
             failure(pkgMsg, offset, e);
         }
@@ -184,7 +177,7 @@ public class BookKeeper {
             props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
 
             long invalidationStartTime = currentTimeMillis();
-            
distributionMetricsService.getInvalidationProcessRequest().increment();
+            subscriberMetrics.getInvalidationProcessRequest().increment();
 
             invalidationProcessor.process(props);
 
@@ -202,9 +195,9 @@ public class BookKeeper {
 
             log.info("Invalidated the cache for the package {} at offset={}", 
pkgMsg, offset);
 
-            
distributionMetricsService.getPackageStatusCounter(Status.IMPORTED.name()).increment();
-            
distributionMetricsService.getInvalidationProcessDuration().update((currentTimeMillis()
 - invalidationStartTime), TimeUnit.MILLISECONDS);
-            
distributionMetricsService.getInvalidationProcessSuccess().increment();
+            
subscriberMetrics.getPackageStatusCounter(Status.IMPORTED.name()).increment();
+            
subscriberMetrics.getInvalidationProcessDuration().update((currentTimeMillis() 
- invalidationStartTime), TimeUnit.MILLISECONDS);
+            subscriberMetrics.getInvalidationProcessSuccess().increment();
         } catch (LoginException | PersistenceException | 
InvalidationProcessException e) {
             failure(pkgMsg, offset, e);
         }
@@ -219,13 +212,13 @@ public class BookKeeper {
         props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
 
         long postProcessStartTime = currentTimeMillis();
-        distributionMetricsService.getImportPostProcessRequest().increment();
+        subscriberMetrics.getImportPostProcessRequest().increment();
         importPostProcessor.process(props);
 
         log.debug("Executed import post processor for package [{}]", 
pkgMsg.getPkgId());
 
-        
distributionMetricsService.getImportPostProcessDuration().update((currentTimeMillis()
 - postProcessStartTime), TimeUnit.MILLISECONDS);
-        distributionMetricsService.getImportPostProcessSuccess().increment();
+        
subscriberMetrics.getImportPostProcessDuration().update((currentTimeMillis() - 
postProcessStartTime), TimeUnit.MILLISECONDS);
+        subscriberMetrics.getImportPostProcessSuccess().increment();
     }
     
     /**
@@ -238,7 +231,7 @@ public class BookKeeper {
      * @throws DistributionException if the package should be retried
      */
     private void failure(PackageMessage pkgMsg, long offset, Exception e) 
throws DistributionException {
-        distributionMetricsService.getFailedPackageImports().mark();
+        subscriberMetrics.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
@@ -255,7 +248,7 @@ public class BookKeeper {
         if (giveUp) {
             log.warn(msg, e);
             removeFailedPackage(pkgMsg, offset);
-            distributionMetricsService.getPermanentImportErrors().increment();
+            subscriberMetrics.getPermanentImportErrors().increment();
         } else {
             packageRetries.increase(pubAgentName);
             throw new DistributionException(msg, e);
@@ -278,7 +271,7 @@ public class BookKeeper {
     public void removePackage(PackageMessage pkgMsg, long offset) throws 
LoginException, PersistenceException {
         log.info("Removing distribution package {} of type {} at offset {}", 
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
-        Timer.Context context = 
distributionMetricsService.getRemovedPackageDuration().time();
+        Timer.Context context = 
subscriberMetrics.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             if (config.isEditable()) {
                 storeStatus(resolver, new PackageStatus(Status.REMOVED, 
offset, pkgMsg.getPubAgentName()));
@@ -288,7 +281,7 @@ public class BookKeeper {
         }
         packageRetries.clear(pkgMsg.getPubAgentName());
         context.stop();
-        
distributionMetricsService.getPackageStatusCounter(Status.REMOVED.name()).increment();
+        
subscriberMetrics.getPackageStatusCounter(Status.REMOVED.name()).increment();
     }
     
     public void skipPackage(long offset) throws LoginException, 
PersistenceException {
@@ -373,7 +366,7 @@ public class BookKeeper {
     public void clearPackageRetriesOnSuccess(PackageMessage pkgMsg) {
         String pubAgentName = pkgMsg.getPubAgentName();
         if (packageRetries.get(pubAgentName) > 0) {
-            distributionMetricsService.getTransientImportErrors().increment();
+            subscriberMetrics.getTransientImportErrors().increment();
         }
 
         packageRetries.clear(pubAgentName);
@@ -395,7 +388,7 @@ public class BookKeeper {
 
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) 
throws DistributionException {
         log.info("Removing failed distribution package {} at offset={}", 
pkgMsg, offset);
-        Timer.Context context = 
distributionMetricsService.getRemovedFailedPackageDuration().time();
+        Timer.Context context = 
subscriberMetrics.getRemovedFailedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED, 
offset, pkgMsg.getPubAgentName()));
             storeOffset(resolver, offset);
@@ -404,7 +397,7 @@ public class BookKeeper {
             throw new DistributionException("Error removing failed package", 
e);
         }
         context.stop();
-        
distributionMetricsService.getPackageStatusCounter(Status.REMOVED_FAILED.name()).increment();
+        
subscriberMetrics.getPackageStatusCounter(Status.REMOVED_FAILED.name()).increment();
     }
 
     private void storeStatus(ResourceResolver resolver, PackageStatus 
packageStatus) throws PersistenceException {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index b2df2ef..79ee33e 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -27,7 +27,7 @@ import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.BinaryStore;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.SubscriberMetrics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
@@ -39,7 +39,7 @@ public class BookKeeperFactory {
     private ResourceResolverFactory resolverFactory;
     
     @Reference
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics;
     
     @Reference
     private EventAdmin eventAdmin;
@@ -69,7 +69,7 @@ public class BookKeeperFactory {
         PackageHandler packageHandler = new PackageHandler(packageBuilder, 
extractor, binaryStore);
         return new BookKeeper(
                 resolverFactory, 
-                distributionMetricsService, 
+                subscriberMetrics, 
                 packageHandler,
                 eventAdmin, 
                 statusSender,
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index a2a8874..ae1dc4a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -23,7 +23,7 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED;
 import static org.apache.sling.distribution.DistributionRequestType.*;
-import static 
org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed;
+import static 
org.apache.sling.distribution.journal.shared.SubscriberMetrics.timed;
 import static 
org.apache.sling.distribution.journal.shared.Strings.requireNotBlank;
 import static 
org.osgi.service.component.annotations.ReferenceCardinality.OPTIONAL;
 import static 
org.osgi.service.component.annotations.ReferencePolicyOption.GREEDY;
@@ -44,7 +44,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
 import 
org.apache.sling.distribution.journal.shared.DistributionLogEventListener;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.PublishMetrics;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
@@ -89,7 +89,7 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     private final EventAdmin eventAdmin;
 
-    private final DistributionMetricsService distributionMetricsService;
+    private final PublishMetrics publishMetrics;
 
     private final PubQueueProvider pubQueueProvider;
 
@@ -124,7 +124,7 @@ public class DistributionPublisher implements 
DistributionAgent {
             @Reference
             Topics topics,
             @Reference
-            DistributionMetricsService distributionMetricsService,
+            PublishMetrics publishMetrics,
             @Reference
             PubQueueProvider pubQueueProvider,
             @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", 
cardinality = OPTIONAL, policyOption = GREEDY)
@@ -135,7 +135,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         this.packageBuilder = packageBuilder;
         this.factory = requireNonNull(factory);
         this.eventAdmin = eventAdmin;
-        this.distributionMetricsService = 
requireNonNull(distributionMetricsService);
+        this.publishMetrics = requireNonNull(publishMetrics);
         this.pubQueueProvider = pubQueueProvider;
 
         pubAgentName = requireNotBlank(config.name());
@@ -149,8 +149,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
-        distributionMetricsService.createGauge(
-                DistributionMetricsService.PUB_COMPONENT + 
".subscriber_count;pub_name=" + pubAgentName,
+        publishMetrics.subscriberCount(pubAgentName,
                 () -> 
discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
         );
         
@@ -181,11 +180,11 @@ public class DistributionPublisher implements 
DistributionAgent {
         try {
             DistributionQueue queue = pubQueueProvider.getQueue(pubAgentName, 
queueName);
             if (queue == null) {
-                
distributionMetricsService.getQueueAccessErrorCount().increment();
+                publishMetrics.getQueueAccessErrorCount().increment();
             }
             return queue;
         } catch (Exception e) {
-            distributionMetricsService.getQueueAccessErrorCount().increment();
+            publishMetrics.getQueueAccessErrorCount().increment();
             throw e;
         }
     }
@@ -247,9 +246,9 @@ public class DistributionPublisher implements 
DistributionAgent {
             if (request.getRequestType() != TEST && request.getPaths().length 
== 0) {
                 throw new DistributionException("Empty paths are not allowed");
             }
-            return timed(distributionMetricsService.getBuildPackageDuration(), 
() -> factory.create(packageBuilder, resourceResolver, pubAgentName, request));
+            return timed(publishMetrics.getBuildPackageDuration(), () -> 
factory.create(packageBuilder, resourceResolver, pubAgentName, request));
         } catch (Exception e) {
-            distributionMetricsService.getDroppedRequests().mark();
+            publishMetrics.getDroppedRequests().mark();
             String msg = format("Failed to create content package for 
requestType=%s, paths=%s. Error=%s",
                     request.getRequestType(), 
Arrays.toString(request.getPaths()), e.getMessage());
             distLog.error(msg, e);
@@ -260,14 +259,14 @@ public class DistributionPublisher implements 
DistributionAgent {
     @Nonnull
     private DistributionResponse send(final PackageMessage pkg, int queueSize, 
int delayMS) throws DistributionException {
         try {
-            long offset = 
timed(distributionMetricsService.getEnqueuePackageDuration(), () -> 
this.sendAndWait(pkg));
-            
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
-            distributionMetricsService.getAcceptedRequests().mark();
+            long offset = timed(publishMetrics.getEnqueuePackageDuration(), () 
-> this.sendAndWait(pkg));
+            publishMetrics.getExportedPackageSize().update(pkg.getPkgLength());
+            publishMetrics.getAcceptedRequests().mark();
             String msg = format("Request accepted with distribution package %s 
at offset=%d, queueSize=%d, queueSizeDelay=%d", pkg, offset, queueSize, 
delayMS);
             distLog.info(msg);
             return new SimpleDistributionResponse(ACCEPTED, msg, 
pkg::getPkgId);
         } catch (Throwable e) {
-            distributionMetricsService.getDroppedRequests().mark();
+            publishMetrics.getDroppedRequests().mark();
             String msg = format("Failed to append distribution package %s to 
the journal", pkg);
             distLog.error(msg, e);
             if (e instanceof Error) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 9f5f84d..23e8df7 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -39,7 +39,7 @@ import 
org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.ClearCallback;
 import org.apache.sling.distribution.journal.queue.QueueState;
 import org.apache.sling.distribution.journal.shared.AgentId;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.PublishMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +50,7 @@ public class MessagingCacheCallback implements CacheCallback {
 
     private final String packageTopic;
 
-    private final DistributionMetricsService distributionMetricsService;
+    private final PublishMetrics publishMetrics;
 
     private final DiscoveryService discoveryService;
 
@@ -59,12 +59,12 @@ public class MessagingCacheCallback implements 
CacheCallback {
     public MessagingCacheCallback(
             MessagingProvider messagingProvider, 
             String packageTopic, 
-            DistributionMetricsService distributionMetricsService,
+            PublishMetrics publishMetrics,
             DiscoveryService discoveryService,
             Consumer<ClearCommand> commandSender) {
         this.messagingProvider = messagingProvider;
         this.packageTopic = packageTopic;
-        this.distributionMetricsService = distributionMetricsService;
+        this.publishMetrics = publishMetrics;
         this.discoveryService = discoveryService;
         this.commandSender = commandSender;
     }
@@ -84,7 +84,7 @@ public class MessagingCacheCallback implements CacheCallback {
     
     @Override
     public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long 
maxOffset) throws InterruptedException {
-        distributionMetricsService.getQueueCacheFetchCount().increment();
+        publishMetrics.getQueueCacheFetchCount().increment();
         return new RangePoller(messagingProvider, packageTopic, minOffset, 
maxOffset, RangePoller.DEFAULT_SEED_DELAY_SECONDS)
                 .fetchRange();
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
index bc44823..7f64427 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java
@@ -32,7 +32,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.queue.CacheCallback;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.PublishMetrics;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -58,7 +58,7 @@ public class PubQueueProviderPublisher {
     private Topics topics;
     
     @Reference
-    private DistributionMetricsService distributionMetricsService;
+    private PublishMetrics publishMetrics;
 
     @Reference
     private PubQueueProviderFactory pubQueueProviderFactory;
@@ -75,7 +75,7 @@ public class PubQueueProviderPublisher {
         CacheCallback callback = new MessagingCacheCallback(
                 messagingProvider, 
                 topics.getPackageTopic(), 
-                distributionMetricsService,
+                publishMetrics,
                 discoveryService,
                 commandSender);
         this.pubQueueProvider = pubQueueProviderFactory.create(callback);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 9f805f6..d09a8e7 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -71,7 +71,7 @@ import 
org.apache.sling.distribution.journal.messages.OffsetMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.shared.Delay;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.SubscriberMetrics;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.settings.SlingSettingsService;
@@ -119,7 +119,7 @@ public class DistributionSubscriber {
     private Precondition precondition;
 
     @Reference
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics;
 
     @Reference
     BookKeeperFactory bookKeeperFactory;
@@ -270,7 +270,7 @@ public class DistributionSubscriber {
 
     private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
         if (shouldEnqueue(info, message)) {
-            distributionMetricsService.getPackageJournalDistributionDuration()
+            subscriberMetrics.getPackageJournalDistributionDuration()
                     .update((currentTimeMillis() - info.getCreateTime()), 
TimeUnit.MILLISECONDS);
             enqueue(new FullMessage<>(info, message));
         } else {
@@ -307,7 +307,7 @@ public class DistributionSubscriber {
         try {
             while (running) {
                 if (messageBuffer.offer(message, 1000, TimeUnit.MILLISECONDS)) 
{
-                    
distributionMetricsService.getItemsBufferSize().increment();
+                    subscriberMetrics.getItemsBufferSize().increment();
                     return;
                 }
             }
@@ -342,10 +342,10 @@ public class DistributionSubscriber {
             DistributionException, ImportPostProcessException {
         blockingSendStoredStatus();
         FullMessage<PackageMessage> item = blockingPeekQueueItem();
-        try (Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time()) {
+        try (Timer.Context context = 
subscriberMetrics.getProcessQueueItemDuration().time()) {
             processQueueItem(item);
             messageBuffer.remove();
-            distributionMetricsService.getItemsBufferSize().decrement();
+            subscriberMetrics.getItemsBufferSize().decrement();
             catchAllDelay = catchAllDelays.get();
         }
     }
@@ -354,7 +354,7 @@ public class DistributionSubscriber {
      * Send status stored in a previous run if exists
      */
     private void blockingSendStoredStatus() throws InterruptedException, 
IOException {
-        try (Timer.Context context = 
distributionMetricsService.getSendStoredStatusDuration().time()) {
+        try (Timer.Context context = 
subscriberMetrics.getSendStoredStatusDuration().time()) {
             int retry = 0;
             while (running) {
                 if (bookKeeper.sendStoredStatus(retry)) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java
new file mode 100644
index 0000000..430c8cb
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import static java.lang.String.format;
+
+import java.util.function.Supplier;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Gauge;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.MetricsService;
+import org.apache.sling.commons.metrics.Timer;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+@Component(service = PublishMetrics.class)
+public class PublishMetrics {
+
+    public static final String BASE_COMPONENT = "distribution.journal";
+
+    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
+
+    private final MetricsService metricsService;
+
+    private final  Histogram exportedPackageSize;
+
+    private final  Meter acceptedRequests;
+
+    private final  Meter droppedRequests;
+
+    private final  Timer buildPackageDuration;
+
+    private final  Timer enqueuePackageDuration;
+
+    private final  Counter queueCacheFetchCount;
+
+    private final  Counter queueAccessErrorCount;
+
+    @Activate
+    public PublishMetrics(@Reference MetricsService metricsService) {
+        this.metricsService = metricsService;
+        exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, 
"exported_package_size"));
+        acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, 
"accepted_requests"));
+        droppedRequests = getMeter(getMetricName(PUB_COMPONENT, 
"dropped_requests"));
+        buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"build_package_duration"));
+        enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"enqueue_package_duration"));
+        queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, 
"queue_cache_fetch_count"));
+        queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, 
"queue_access_error_count"));
+    }
+
+    /**
+     * Histogram of the exported content package size in Bytes.
+     *
+     * @return a Sling Metrics histogram
+     */
+    public Histogram getExportedPackageSize() {
+        return exportedPackageSize;
+    }
+
+    /**
+     * Meter of requests returning an {@code 
DistributionRequestState.ACCEPTED} state.
+     *
+     * @return a Sling Metrics meter
+     */
+    public Meter getAcceptedRequests() {
+        return acceptedRequests;
+    }
+
+    /**
+     * Meter of requests returning an {@code DistributionRequestState.DROPPED} 
state.
+     *
+     * @return a Sling Metrics meter
+     */
+    public Meter getDroppedRequests() {
+        return droppedRequests;
+    }
+
+    /**
+     * Timer capturing the duration in ms of building a content package
+     *
+     * @return a Sling Metric timer
+     */
+    public Timer getBuildPackageDuration() {
+        return buildPackageDuration;
+    }
+
+    /**
+     * Timer capturing the duration in ms of adding a package to the queue
+     *
+     * @return a Sling Metric timer
+     */
+    public Timer getEnqueuePackageDuration() {
+        return enqueuePackageDuration;
+    }
+
+    /**
+     * Counter of fetch operations to feed the queue cache.
+     *
+     * @return a Sling Metric counter
+     */
+    public Counter getQueueCacheFetchCount() {
+        return queueCacheFetchCount;
+    }
+
+    /**
+     * Counter of queue access errors.
+     *
+     * @return a Sling Metric counter
+     */
+    public Counter getQueueAccessErrorCount() {
+        return queueAccessErrorCount;
+    }
+
+    /**
+     * Counter of journal error codes.
+     *
+     * @return a Sling Metric counter
+     */
+    public Counter getJournalErrorCodeCount(String errorCode) {
+        return getCounter(
+            getNameWithLabel(getMetricName(BASE_COMPONENT, 
"journal_unavailable_error_code_count"), "error_code", errorCode));
+    }
+
+    /**
+     * Counter for all the different package status.
+     *
+     * @return a Sling Metric counter
+     */
+    public Counter getPackageStatusCounter(String status) {
+        return getCounter(
+                getNameWithLabel(getMetricName(BASE_COMPONENT, 
"package_status_count"), "status", status)
+        );
+    }
+    
+    public void subscriberCount(String pubAgentName, Supplier<Integer> 
subscriberCountCallback) {
+        createGauge(PublishMetrics.PUB_COMPONENT + 
".subscriber_count;pub_name=" + pubAgentName,
+                subscriberCountCallback);
+        
+    }
+
+    private <T> Gauge<T> createGauge(String name, Supplier<T> supplier) {
+        return metricsService.gauge(name, supplier);
+    }
+
+    private String getMetricName(String component, String name) {
+        return format("%s.%s", component, name);
+    }
+
+    private String getNameWithLabel(String name, String label, String 
labelVal) {
+        return format("%s;%s=%s", name, label, labelVal);
+    }
+
+    private Counter getCounter(String metricName) {
+        return metricsService.counter(metricName);
+    }
+
+    private Timer getTimer(String metricName) {
+        return metricsService.timer(metricName);
+    }
+
+    private Histogram getHistogram(String metricName) {
+        return metricsService.histogram(metricName);
+    }
+
+    private Meter getMeter(String metricName) {
+        return metricsService.meter(metricName);
+    }
+
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java
similarity index 77%
rename from 
src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
rename to 
src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java
index c77e37f..b0d656c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java
@@ -33,25 +33,17 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 
-@Component(service = DistributionMetricsService.class)
-public class DistributionMetricsService {
+@Component(service = SubscriberMetrics.class)
+public class SubscriberMetrics {
 
     public static final String BASE_COMPONENT = "distribution.journal";
 
-    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
     public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
     
     private final MetricsService metricsService;
 
     private final Histogram importedPackageSize;
 
-    private final  Histogram exportedPackageSize;
-
-    private final  Meter acceptedRequests;
-
-    private final  Meter droppedRequests;
-
     private final  Counter itemsBufferSize;
 
     private final  Timer removedPackageDuration;
@@ -70,14 +62,6 @@ public class DistributionMetricsService {
 
     private final  Timer packageJournalDistributionDuration;
 
-    private final  Timer buildPackageDuration;
-
-    private final  Timer enqueuePackageDuration;
-
-    private final  Counter queueCacheFetchCount;
-
-    private final  Counter queueAccessErrorCount;
-
     private final  Timer importPostProcessDuration;
     
     private final  Counter importPostProcessSuccess;
@@ -95,14 +79,8 @@ public class DistributionMetricsService {
     private final  Counter permanentImportErrors;
 
     @Activate
-    public DistributionMetricsService(@Reference MetricsService 
metricsService) {
+    public SubscriberMetrics(@Reference MetricsService metricsService) {
         this.metricsService = metricsService;
-        exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, 
"exported_package_size"));
-        acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, 
"accepted_requests"));
-        droppedRequests = getMeter(getMetricName(PUB_COMPONENT, 
"dropped_requests"));
-        buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"build_package_duration"));
-        enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, 
"enqueue_package_duration"));
-        queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, 
"queue_cache_fetch_count"));
         importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, 
"imported_package_size"));
         itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, 
"items_buffer_size"));
         importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, 
"imported_package_duration"));
@@ -113,11 +91,10 @@ public class DistributionMetricsService {
         processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, 
"process_queue_item_duration"));
         packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, 
"request_distributed_duration"));
         packageJournalDistributionDuration = 
getTimer(getMetricName(SUB_COMPONENT, "package_journal_distribution_duration"));
-        queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, 
"queue_access_error_count"));
-        importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, 
"import_post_process_duration"));
+        importPostProcessDuration = getTimer(getMetricName(SUB_COMPONENT, 
"import_post_process_duration"));
         importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, 
"import_post_process_success_count"));
         importPostProcessRequest = getCounter(getMetricName(SUB_COMPONENT, 
"import_post_process_request_count"));
-        invalidationProcessDuration = getTimer(getMetricName(PUB_COMPONENT, 
"invalidation_process_duration"));
+        invalidationProcessDuration = getTimer(getMetricName(SUB_COMPONENT, 
"invalidation_process_duration"));
         invalidationProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, 
"invalidation_process_success_count"));
         invalidationProcessRequest = getCounter(getMetricName(SUB_COMPONENT, 
"invalidation_process_request_count"));
         transientImportErrors = getCounter(getMetricName(SUB_COMPONENT, 
"transient_import_errors"));
@@ -164,33 +141,6 @@ public class DistributionMetricsService {
         return importedPackageSize;
     }
 
-    /**
-     * Histogram of the exported content package size in Bytes.
-     *
-     * @return a Sling Metrics histogram
-     */
-    public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
-    }
-
-    /**
-     * Meter of requests returning an {@code 
DistributionRequestState.ACCEPTED} state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getAcceptedRequests() {
-        return acceptedRequests;
-    }
-
-    /**
-     * Meter of requests returning an {@code DistributionRequestState.DROPPED} 
state.
-     *
-     * @return a Sling Metrics meter
-     */
-    public Meter getDroppedRequests() {
-        return droppedRequests;
-    }
-
     /**
      * Counter of the package buffer size on the subscriber.
      *
@@ -274,42 +224,6 @@ public class DistributionMetricsService {
         return packageJournalDistributionDuration;
     }
 
-    /**
-     * Timer capturing the duration in ms of building a content package
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
-    }
-
-    /**
-     * Timer capturing the duration in ms of adding a package to the queue
-     *
-     * @return a Sling Metric timer
-     */
-    public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
-    }
-
-    /**
-     * Counter of fetch operations to feed the queue cache.
-     *
-     * @return a Sling Metric counter
-     */
-    public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
-    }
-
-    /**
-     * Counter of queue access errors.
-     *
-     * @return a Sling Metric counter
-     */
-    public Counter getQueueAccessErrorCount() {
-        return queueAccessErrorCount;
-    }
-
     /**
      * Counter of journal error codes.
      *
@@ -331,7 +245,7 @@ public class DistributionMetricsService {
         );
     }
 
-    public <T> Gauge<T> createGauge(String name, Supplier<T> supplier) {
+    private <T> Gauge<T> createGauge(String name, Supplier<T> supplier) {
         return metricsService.gauge(name, supplier);
     }
 
@@ -391,4 +305,9 @@ public class DistributionMetricsService {
         return permanentImportErrors;
     }
 
+    public void currentRetries(String subAgentName, Supplier<Integer> 
retriesCallback) {
+        String nameRetries = SubscriberMetrics.SUB_COMPONENT + 
".current_retries;sub_name=" + subAgentName;
+        createGauge(nameRetries, retriesCallback);
+    }
+
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 818cb7c..1df8910 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -38,7 +38,7 @@ import 
org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.SubscriberMetrics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
@@ -58,7 +58,7 @@ public class BookKeeperTest {
 
     private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
 
-    private DistributionMetricsService distributionMetricsService;
+    private SubscriberMetrics subscriberMetrics;
 
     @Mock
     private EventAdmin eventAdmin;
@@ -85,10 +85,10 @@ public class BookKeeperTest {
 
     @Before
     public void before() {
-        distributionMetricsService = new 
DistributionMetricsService(MetricsService.NOOP);
+        subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
 
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract, "package", true);
-        bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packageHandler, eventAdmin, sender, logSender, 
bkConfig,
+        bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, 
packageHandler, eventAdmin, sender, logSender, bkConfig,
             importPostProcessor, invalidationProcessor);
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 37028f4..1182734 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -63,7 +63,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl;
 import org.apache.sling.distribution.journal.queue.impl.PubQueue;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.PublishMetrics;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -109,7 +109,7 @@ public class DistributionPublisherTest {
     @Mock
     private DistributionPackageBuilder packageBuilder;
 
-    private DistributionMetricsService distributionMetricsService;
+    private PublishMetrics publishMetrics;
 
     private OsgiContext context = new OsgiContext();
 
@@ -133,7 +133,7 @@ public class DistributionPublisherTest {
     @Before
     public void before() throws Exception {
         MetricsService metricsService = 
context.registerInjectActivateService(MetricsServiceImpl.class);
-        distributionMetricsService = new 
DistributionMetricsService(metricsService);
+        publishMetrics = new PublishMetrics(metricsService);
         when(packageBuilder.getType()).thenReturn("journal");
         Map<String, String> props = Map.of("name", PUB1AGENT1,
                 "maxQueueSizeDelay", "1000");
@@ -142,7 +142,7 @@ public class DistributionPublisherTest {
         BundleContext bcontext = context.bundleContext();
         
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, 
packageBuilder, discoveryService, factory,
-                eventAdmin, topics, distributionMetricsService, 
pubQueueProvider, Condition.INSTANCE, config, bcontext);
+                eventAdmin, topics, publishMetrics, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
@@ -251,7 +251,7 @@ public class DistributionPublisherTest {
 
         DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
         assertNull(queue);
-        Counter counter = 
distributionMetricsService.getQueueAccessErrorCount();
+        Counter counter = publishMetrics.getQueueAccessErrorCount();
         assertEquals("Wrong queue counter expected",1, counter.getCount());
     }
 
@@ -265,7 +265,7 @@ public class DistributionPublisherTest {
             fail("Expected exception not thrown");
         } catch (RuntimeException expectedException) {
         }
-        Counter counter = 
distributionMetricsService.getQueueAccessErrorCount();
+        Counter counter = publishMetrics.getQueueAccessErrorCount();
         assertEquals("Wrong getQueue error counter",1, counter.getCount());
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
index 7b148d5..ddeebd9 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
@@ -49,7 +49,7 @@ import 
org.apache.sling.distribution.journal.messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.queue.QueueState;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.PublishMetrics;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.junit.Before;
@@ -87,7 +87,7 @@ public class MessagingCacheCallbackTest {
     private JournalAvailable journalAvailable;
     
     @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private PublishMetrics publishMetrics;
     
     @Mock
     private MessageHandler<PackageMessage> handler;
@@ -113,7 +113,7 @@ public class MessagingCacheCallbackTest {
     @Before
     public void before() {
         callback = new MessagingCacheCallback(messagingProvider, "package", 
-                distributionMetricsService, discovery, (command) -> 
sender.accept(command));
+                publishMetrics, discovery, (command) -> 
sender.accept(command));
     }
 
     @Test
@@ -127,7 +127,7 @@ public class MessagingCacheCallbackTest {
 
     @Test
     public void testFetchRange() throws Exception {
-        
when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter);
+        when(publishMetrics.getQueueCacheFetchCount()).thenReturn(counter);
         when(messagingProvider.assignTo(10L)).thenReturn("0:10");
         CompletableFuture<List<FullMessage<PackageMessage>>> result = 
CompletableFuture.supplyAsync(this::fetch);
         verify(messagingProvider, timeout(1000)).createPoller(
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 7789039..5d13f72 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -82,7 +82,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.messages.PingMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.SubscriberMetrics;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
@@ -177,7 +177,7 @@ public class SubscriberTest {
     private MessageSender<PackageStatusMessage> statusSender;
 
     @Spy
-    private DistributionMetricsService distributionMetricsService = new 
DistributionMetricsService(MetricsService.NOOP);
+    private SubscriberMetrics subscriberMetrics = new 
SubscriberMetrics(MetricsService.NOOP);
     
     @Spy
     private ImportPostProcessor importPostProcessor = new 
NoOpImportPostProcessor();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
similarity index 83%
rename from 
src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
rename to 
src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
index 4bca99e..be5e2e3 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
@@ -34,14 +34,14 @@ import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
-public class DistributionMetricsServiceTest {
+public class SubscriberMetricsTest {
     
-    DistributionMetricsService metrics;
+    SubscriberMetrics metrics;
 
     @Before
     public void before() {
         MetricsService metricsService = MetricsService.NOOP;
-        metrics = new DistributionMetricsService(metricsService);
+        metrics = new SubscriberMetrics(metricsService);
     }
 
     public static void mockBehaviour(MetricsService metricsService) {
@@ -55,11 +55,6 @@ public class DistributionMetricsServiceTest {
 
     @Test
     public void testGetMetrics() {
-        assertNotNull(metrics.getAcceptedRequests());
-        assertNotNull(metrics.getBuildPackageDuration());
-        assertNotNull(metrics.getDroppedRequests());
-        assertNotNull(metrics.getEnqueuePackageDuration());
-        assertNotNull(metrics.getExportedPackageSize());
         assertNotNull(metrics.getFailedPackageImports());
         assertNotNull(metrics.getImportedPackageDuration());
         assertNotNull(metrics.getImportedPackageSize());
@@ -67,14 +62,11 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getPackageDistributedDuration());
         assertNotNull(metrics.getPackageJournalDistributionDuration());
         assertNotNull(metrics.getProcessQueueItemDuration());
-        assertNotNull(metrics.getQueueCacheFetchCount());
-        assertNotNull(metrics.getQueueAccessErrorCount());
         assertNotNull(metrics.getRemovedFailedPackageDuration());
         assertNotNull(metrics.getRemovedPackageDuration());
         assertNotNull(metrics.getSendStoredStatusDuration());
         assertNotNull(metrics.getPackageStatusCounter("mockStatus"));
         assertNotNull(metrics.getTransientImportErrors());
         assertNotNull(metrics.getPermanentImportErrors());
-        metrics.createGauge("name", () -> 42);
     }
 }

Reply via email to