This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 502b13c  SLING-12292 - Add tags to metrics (#138)
502b13c is described below

commit 502b13c2ac8dad614280c22557804c89741c08fa
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Tue Apr 16 16:11:35 2024 +0200

    SLING-12292 - Add tags to metrics (#138)
    
    * SLING-12292 - Add tags to metrics
    
    * SLING-12292 - Add tags to publish metrics
---
 .../journal/bookkeeper/BookKeeper.java             |   2 +-
 .../journal/bookkeeper/BookKeeperFactory.java      |   6 +-
 .../journal/bookkeeper/SubscriberMetrics.java      | 188 ++++++++++-----------
 .../journal/impl/discovery/DiscoveryService.java   |   4 +
 .../impl/publisher/DistributionPublisher.java      |  13 +-
 .../journal/impl/publisher/PublishMetrics.java     |  76 +++------
 .../impl/subscriber/DistributionSubscriber.java    |  10 +-
 .../journal/metrics/TaggedMetrics.java             |   5 +
 .../journal/bookkeeper/BookKeeperTest.java         |   3 +-
 .../impl/publisher/DistributionPublisherTest.java  |  22 +--
 .../journal/impl/subscriber/SubscriberTest.java    |   4 +-
 .../journal/shared/SubscriberMetricsTest.java      |   2 +-
 12 files changed, 158 insertions(+), 177 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 76697c1..98e8298 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -112,7 +112,7 @@ public class BookKeeper {
         this.logSender = logSender;
         this.config = config;
         
-        subscriberMetrics.currentRetries(config.getSubAgentName(), 
packageRetries::getSum);
+        subscriberMetrics.currentRetries(packageRetries::getSum);
         this.resolverFactory = resolverFactory;
         this.subscriberMetrics = subscriberMetrics;
         // Error queues are enabled when the number
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 584d404..dd1b492 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -38,9 +38,6 @@ public class BookKeeperFactory {
     @Reference
     private ResourceResolverFactory resolverFactory;
     
-    @Reference
-    private SubscriberMetrics subscriberMetrics;
-    
     @Reference
     private EventAdmin eventAdmin;
     
@@ -63,7 +60,8 @@ public class BookKeeperFactory {
             DistributionPackageBuilder packageBuilder, 
             BookKeeperConfig config, 
             Consumer<PackageStatusMessage> statusSender,
-            Consumer<LogMessage> logSender
+            Consumer<LogMessage> logSender, 
+            SubscriberMetrics subscriberMetrics
             ) {
         ContentPackageExtractor extractor = new ContentPackageExtractor(
                 packaging,
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 686dd0c..5846cfc 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.distribution.journal.bookkeeper;
 
-import static java.lang.String.format;
+import static 
org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -28,82 +30,73 @@ import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
 
-@Component(service = SubscriberMetrics.class)
+/**
+ * Metrics for DistributionSubscriber
+ * most metrics will have two parameters:
+ * TAG_SUB_NAME and TAG_EDITABLE
+ */
 public class SubscriberMetrics {
-    public static final String SUB_COMPONENT = 
"distribution.journal.subscriber";
+    // Name of the subscriber agent
+    private static final String TAG_SUB_NAME = "sub_name";
     
-    private final MetricsService metricsService;
-
-    private final Histogram importedPackageSize;
-
-    private final  Counter itemsBufferSize;
-
-    private final  Timer removedPackageDuration;
-
-    private final  Timer removedFailedPackageDuration;
-
-    private final  Timer importedPackageDuration;
-
-    private final  Meter failedPackageImports;
-
-    private final  Timer sendStoredStatusDuration;
-
-    private final  Timer processQueueItemDuration;
-
-    private final  Timer packageDistributedDuration;
-
-    private final  Timer packageJournalDistributionDuration;
-
-    private final  Timer importPreProcessDuration;
-
-    private final  Counter importPreProcessSuccess;
-
-    private final  Counter importPreProcessRequest;
-
-    private final  Timer importPostProcessDuration;
+    // Status of a package : 
+    private static final String TAG_STATUS = "status";
     
-    private final  Counter importPostProcessSuccess;
-
-    private final  Counter importPostProcessRequest;
-
-    private final  Timer invalidationProcessDuration;
-
-    private final  Counter invalidationProcessSuccess;
+    // Is the queue editable (true, false)
+    private static final String TAG_EDITABLE = "editable";
+    
+    public static final String SUB_COMPONENT = 
"distribution.journal.subscriber.";
+    
+    private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT + 
"package_status_count";
+    
+    // Number of packages with at least one failure to apply 
+    private static final String CURRENT_RETRIES = SUB_COMPONENT + 
"current_retries";
 
-    private final  Counter invalidationProcessRequest;
+    // Cumulated size of all packages (parameters: TAG_SUB_NAME, editable 
(golden publish))
+    private static final String IMPORTED_PACKAGE_SIZE = SUB_COMPONENT + 
"imported_package_size";
+    private static final String ITEMS_BUFFER_SIZE = SUB_COMPONENT + 
"items_buffer_size";
 
-    private final  Counter transientImportErrors;
+    // Increased on every failure to apply a package
+    private static final String FAILED_PACKAGE_IMPORTS = SUB_COMPONENT + 
"failed_package_imports";
+    
+    // Increased when a package failed before but then succeeded (parameters: 
agent, editable (golden publish))
+    private static final String TRANSIENT_IMPORT_ERRORS = SUB_COMPONENT + 
"transient_import_errors";
+
+    // Only counted in error queue setup
+    private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + 
"permanent_import_errors";
+
+    private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = 
SUB_COMPONENT + "import_pre_process_request_count";
+    private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = 
SUB_COMPONENT + "import_post_process_success_count";
+    private static final String IMPORT_POST_PROCESS_REQUEST_COUNT = 
SUB_COMPONENT + "import_post_process_request_count";
+    private static final String INVALIDATION_PROCESS_SUCCESS_COUNT = 
SUB_COMPONENT + "invalidation_process_success_count";
+    private static final String INVALIDATION_PROCESS_REQUEST_COUNT = 
SUB_COMPONENT + "invalidation_process_request_count";
+    private static final String IMPORT_PRE_PROCESS_SUCCESS_COUNT = 
SUB_COMPONENT + "import_pre_process_success_count";
+
+    private static final String IMPORTED_PACKAGE_DURATION = SUB_COMPONENT + 
"imported_package_duration";
+    private static final String REMOVED_PACKAGE_DURATION = SUB_COMPONENT + 
"removed_package_duration";
+    private static final String REMOVED_FAILED_PACKAGE_DURATION = 
SUB_COMPONENT + "removed_failed_package_duration";
+    private static final String SEND_STORED_STATUS_DURATION = SUB_COMPONENT + 
"send_stored_status_duration";
+    private static final String PROCESS_QUEUE_ITEM_DURATION = SUB_COMPONENT + 
"process_queue_item_duration";
+    private static final String REQUEST_DISTRIBUTED_DURATION = SUB_COMPONENT + 
"request_distributed_duration";
+    private static final String PACKAGE_JOURNAL_DISTRIBUTION_DURATION = 
SUB_COMPONENT + "package_journal_distribution_duration";
+    private static final String IMPORT_PRE_PROCESS_DURATION = SUB_COMPONENT + 
"import_pre_process_duration";
+    private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT + 
"import_post_process_duration";
+    private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT 
+ "invalidation_process_duration";
 
-    private final  Counter permanentImportErrors;
+    private final MetricsService metricsService;
+    private final Tag tagSubName;
+    private final Tag tagEditable;
+    private final List<Tag> tags;
 
-    @Activate
-    public SubscriberMetrics(@Reference MetricsService metricsService) {
+    public SubscriberMetrics(MetricsService metricsService, String 
subAgentName, boolean editable) {
         this.metricsService = metricsService;
-        importedPackageSize = 
metricsService.histogram(getMetricName("imported_package_size"));
-        itemsBufferSize = 
metricsService.counter(getMetricName("items_buffer_size"));
-        importedPackageDuration = 
metricsService.timer(getMetricName("imported_package_duration"));
-        removedPackageDuration = 
metricsService.timer(getMetricName("removed_package_duration"));
-        removedFailedPackageDuration = 
metricsService.timer(getMetricName("removed_failed_package_duration"));
-        failedPackageImports = 
metricsService.meter(getMetricName("failed_package_imports"));
-        sendStoredStatusDuration = 
metricsService.timer(getMetricName("send_stored_status_duration"));
-        processQueueItemDuration = 
metricsService.timer(getMetricName("process_queue_item_duration"));
-        packageDistributedDuration = 
metricsService.timer(getMetricName("request_distributed_duration"));
-        packageJournalDistributionDuration = 
metricsService.timer(getMetricName("package_journal_distribution_duration"));
-        importPreProcessDuration = 
metricsService.timer(getMetricName("import_pre_process_duration"));
-        importPreProcessSuccess = 
metricsService.counter(getMetricName("import_pre_process_success_count"));
-        importPreProcessRequest = 
metricsService.counter(getMetricName("import_pre_process_request_count"));
-        importPostProcessDuration = 
metricsService.timer(getMetricName("import_post_process_duration"));
-        importPostProcessSuccess = 
metricsService.counter(getMetricName("import_post_process_success_count"));
-        importPostProcessRequest = 
metricsService.counter(getMetricName("import_post_process_request_count"));
-        invalidationProcessDuration = 
metricsService.timer(getMetricName("invalidation_process_duration"));
-        invalidationProcessSuccess = 
metricsService.counter(getMetricName("invalidation_process_success_count"));
-        invalidationProcessRequest = 
metricsService.counter(getMetricName("invalidation_process_request_count"));
-        transientImportErrors = 
metricsService.counter(getMetricName("transient_import_errors"));
-        permanentImportErrors = 
metricsService.counter(getMetricName("permanent_import_errors"));
+        tagSubName = Tag.of(TAG_SUB_NAME, subAgentName);
+        tagEditable = Tag.of(TAG_EDITABLE, Boolean.toString(editable));
+        tags = Arrays.asList(
+                tagSubName, 
+                tagEditable);
     }
 
     /**
@@ -112,7 +105,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics histogram
      */
     public Histogram getImportedPackageSize() {
-        return importedPackageSize;
+        return metricsService.histogram(getMetricName(IMPORTED_PACKAGE_SIZE, 
tags));
     }
 
     /**
@@ -121,7 +114,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics counter
      */
     public Counter getItemsBufferSize() {
-        return itemsBufferSize;
+        return metricsService.counter(getMetricName(ITEMS_BUFFER_SIZE, tags));
     } 
 
     /**
@@ -130,7 +123,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics timer
      */
     public Timer getImportedPackageDuration() {
-        return importedPackageDuration;
+        return metricsService.timer(getMetricName(IMPORTED_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -139,7 +132,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics timer
      */
     public Timer getRemovedPackageDuration() {
-        return removedPackageDuration;
+        return metricsService.timer(getMetricName(REMOVED_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -148,7 +141,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics timer
      */
     public Timer getRemovedFailedPackageDuration() {
-        return removedFailedPackageDuration;
+        return 
metricsService.timer(getMetricName(REMOVED_FAILED_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -157,7 +150,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics meter
      */
     public Meter getFailedPackageImports() {
-        return failedPackageImports;
+        return metricsService.meter(getMetricName(FAILED_PACKAGE_IMPORTS, 
tags));
     }
 
     /**
@@ -166,7 +159,7 @@ public class SubscriberMetrics {
      * @return a Sling Metric timer
      */
     public Timer getSendStoredStatusDuration() {
-        return sendStoredStatusDuration;
+        return metricsService.timer(getMetricName(SEND_STORED_STATUS_DURATION, 
tags));
     }
 
     /**
@@ -175,7 +168,7 @@ public class SubscriberMetrics {
      * @return a Sling Metric timer
      */
     public Timer getProcessQueueItemDuration() {
-        return processQueueItemDuration;
+        return metricsService.timer(getMetricName(PROCESS_QUEUE_ITEM_DURATION, 
tags));
     }
 
     /**
@@ -185,7 +178,7 @@ public class SubscriberMetrics {
      * @return a Sling Metric timer
      */
     public Timer getPackageDistributedDuration() {
-        return packageDistributedDuration;
+        return 
metricsService.timer(getMetricName(REQUEST_DISTRIBUTED_DURATION, tags));
     }
 
     /**
@@ -195,7 +188,7 @@ public class SubscriberMetrics {
      * @return a Sling Metrics timer
      */
     public Timer getPackageJournalDistributionDuration() {
-        return packageJournalDistributionDuration;
+        return 
metricsService.timer(getMetricName(PACKAGE_JOURNAL_DISTRIBUTION_DURATION, 
tags));
     }
 
     /**
@@ -204,63 +197,58 @@ public class SubscriberMetrics {
      * @return a Sling Metric counter
      */
     public Counter getPackageStatusCounter(Status status) {
-        return 
metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), 
"status", status.name()));
+        Tag tagStatus = Tag.of(TAG_STATUS, status.name());
+        String name = getMetricName(PACKAGE_STATUS_COUNT, 
Arrays.asList(tagSubName, tagEditable, tagStatus));
+        return metricsService.counter(name);
     }
 
     public Timer getImportPreProcessDuration() {
-        return importPreProcessDuration;
+        return metricsService.timer(getMetricName(IMPORT_PRE_PROCESS_DURATION, 
tags));
     }
 
     public Counter getImportPreProcessSuccess() {
-        return importPreProcessSuccess;
+        return 
metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getImportPreProcessRequest() {
-        return importPreProcessRequest;
+        return 
metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Timer getImportPostProcessDuration() {
-        return importPostProcessDuration;
+        return 
metricsService.timer(getMetricName(IMPORT_POST_PROCESS_DURATION, tags));
     }
 
     public Counter getImportPostProcessSuccess() {
-        return importPostProcessSuccess;
+        return 
metricsService.counter(getMetricName(IMPORT_POST_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getImportPostProcessRequest() {
-        return importPostProcessRequest;
+        return 
metricsService.counter(getMetricName(IMPORT_POST_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Timer getInvalidationProcessDuration() {
-        return invalidationProcessDuration;
+        return 
metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
     }
 
     public Counter getInvalidationProcessSuccess() {
-        return invalidationProcessSuccess;
+        return 
metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getInvalidationProcessRequest() {
-        return invalidationProcessRequest;
+        return 
metricsService.counter(getMetricName(INVALIDATION_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Counter getTransientImportErrors() {
-        return transientImportErrors;
+        return metricsService.counter(getMetricName(TRANSIENT_IMPORT_ERRORS, 
tags));
     }
 
     public Counter getPermanentImportErrors() { 
-        return permanentImportErrors;
+        return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, 
tags));
     }
 
-    public void currentRetries(String subAgentName, Supplier<Integer> 
retriesCallback) {
-        String nameRetries = SubscriberMetrics.SUB_COMPONENT + 
".current_retries;sub_name=" + subAgentName;
-        metricsService.gauge(nameRetries, retriesCallback);
+    public void currentRetries(Supplier<Integer> retriesCallback) {
+        metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), 
retriesCallback);
     }
     
-    private String getMetricName(String name) {
-        return format("%s.%s", SUB_COMPONENT, name);
-    }
-
-    private String getNameWithLabel(String name, String label, String 
labelVal) {
-        return format("%s;%s=%s", name, label, labelVal);
-    }
 }
+ 
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index ef76bd1..b624d80 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -134,6 +134,10 @@ public class DiscoveryService implements Runnable {
     public TopologyView getTopologyView() {
         return viewManager.getCurrentView();
     }
+    
+    public int getSubscriberCount(String pubAgentName) {
+        return getTopologyView().getSubscribedAgentIds(pubAgentName).size();
+    }
 
     @Override
     public void run() {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2bf24f5..2adec23 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -46,6 +46,7 @@ import 
org.apache.sling.distribution.journal.shared.DistributionLogEventListener
 import org.apache.sling.distribution.journal.shared.Timed;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionResponse;
@@ -123,7 +124,7 @@ public class DistributionPublisher implements 
DistributionAgent {
             @Reference
             Topics topics,
             @Reference
-            PublishMetrics publishMetrics,
+            MetricsService metricsService,
             @Reference
             PubQueueProvider pubQueueProvider,
             @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", 
cardinality = OPTIONAL, policyOption = GREEDY)
@@ -131,13 +132,15 @@ public class DistributionPublisher implements 
DistributionAgent {
             PublisherConfiguration config,
             BundleContext context) {
 
+        pubAgentName = requireNotBlank(config.name());
+
         this.packageBuilder = packageBuilder;
         this.factory = requireNonNull(factory);
         this.eventAdmin = eventAdmin;
-        this.publishMetrics = requireNonNull(publishMetrics);
+        requireNonNull(metricsService);
+        this.publishMetrics = new PublishMetrics(metricsService, pubAgentName);
         this.pubQueueProvider = pubQueueProvider;
 
-        pubAgentName = requireNotBlank(config.name());
         distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
         distributionLogEventListener = new 
DistributionLogEventListener(context, distLog, pubAgentName);
 
@@ -148,9 +151,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
-        publishMetrics.subscriberCount(pubAgentName,
-                () -> 
discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
-        );
+        publishMetrics.subscriberCount(() -> 
discoveryService.getSubscriberCount(pubAgentName));
         
         distLog.info("Started Publisher agent={} with packageBuilder={}, 
limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
                 pubAgentName, pkgType, limitEnabled, queuedTimeout, 
queueSizeLimit, maxQueueSizeDelay);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
index ce53518..ef64ad5 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static java.lang.String.format;
+import static 
org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -27,43 +29,27 @@ import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
 
-@Component(service = PublishMetrics.class)
 public class PublishMetrics {
-
-    public static final String BASE_COMPONENT = "distribution.journal";
-
-    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
+    private static final String TAG_AGENT_NAME = "pub_name";
+
+    public static final String PUB_COMPONENT = 
"distribution.journal.publisher.";
+    private static final String EXPORTED_PACKAGE_SIZE = PUB_COMPONENT + 
"exported_package_size";
+    private static final String ACCEPTED_REQUESTS = PUB_COMPONENT + 
"accepted_requests";
+    private static final String DROPPED_REQUESTS = PUB_COMPONENT + 
"dropped_requests";
+    private static final String BUILD_PACKAGE_DURATION = PUB_COMPONENT + 
"build_package_duration";
+    private static final String ENQUEUE_PACKAGE_DURATION = PUB_COMPONENT + 
"enqueue_package_duration";
+    private static final String QUEUE_CACHE_FETCH_COUNT = PUB_COMPONENT + 
"queue_cache_fetch_count";
+    private static final String QUEUE_ACCESS_ERROR_COUNT = PUB_COMPONENT + 
"queue_access_error_count";
+    private static final String SUBSCRIBER_COUNT = PUB_COMPONENT + 
"subscriber_count";
+
+    private final List<Tag> tags;
     private final MetricsService metricsService;
 
-    private final  Histogram exportedPackageSize;
-
-    private final  Meter acceptedRequests;
-
-    private final  Meter droppedRequests;
-
-    private final  Timer buildPackageDuration;
-
-    private final  Timer enqueuePackageDuration;
-
-    private final  Counter queueCacheFetchCount;
-
-    private final  Counter queueAccessErrorCount;
-
-    @Activate
-    public PublishMetrics(@Reference MetricsService metricsService) {
+    public PublishMetrics(MetricsService metricsService, String pubAgentName) {
+        this.tags = Arrays.asList(Tag.of(TAG_AGENT_NAME, pubAgentName));
         this.metricsService = metricsService;
-        exportedPackageSize = 
metricsService.histogram(getMetricName("exported_package_size"));
-        acceptedRequests = 
metricsService.meter(getMetricName("accepted_requests"));
-        droppedRequests = 
metricsService.meter(getMetricName("dropped_requests"));
-        buildPackageDuration = 
metricsService.timer(getMetricName("build_package_duration"));
-        enqueuePackageDuration = 
metricsService.timer(getMetricName("enqueue_package_duration"));
-        queueCacheFetchCount = 
metricsService.counter(getMetricName("queue_cache_fetch_count"));
-        queueAccessErrorCount = 
metricsService.counter(getMetricName("queue_access_error_count"));
     }
 
     /**
@@ -72,7 +58,7 @@ public class PublishMetrics {
      * @return a Sling Metrics histogram
      */
     public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
+        return metricsService.histogram(getMetricName(EXPORTED_PACKAGE_SIZE, 
tags));
     }
 
     /**
@@ -81,7 +67,7 @@ public class PublishMetrics {
      * @return a Sling Metrics meter
      */
     public Meter getAcceptedRequests() {
-        return acceptedRequests;
+        return metricsService.meter(getMetricName(ACCEPTED_REQUESTS, tags));
     }
 
     /**
@@ -90,7 +76,7 @@ public class PublishMetrics {
      * @return a Sling Metrics meter
      */
     public Meter getDroppedRequests() {
-        return droppedRequests;
+        return metricsService.meter(getMetricName(DROPPED_REQUESTS, tags));
     }
 
     /**
@@ -99,7 +85,7 @@ public class PublishMetrics {
      * @return a Sling Metric timer
      */
     public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
+        return metricsService.timer(getMetricName(BUILD_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -108,7 +94,7 @@ public class PublishMetrics {
      * @return a Sling Metric timer
      */
     public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
+        return metricsService.timer(getMetricName(ENQUEUE_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -117,7 +103,7 @@ public class PublishMetrics {
      * @return a Sling Metric counter
      */
     public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
+        return metricsService.counter(getMetricName(QUEUE_CACHE_FETCH_COUNT, 
tags));
     }
 
     /**
@@ -126,17 +112,11 @@ public class PublishMetrics {
      * @return a Sling Metric counter
      */
     public Counter getQueueAccessErrorCount() {
-        return queueAccessErrorCount;
-    }
-
-    public void subscriberCount(String pubAgentName, Supplier<Integer> 
subscriberCountCallback) {
-        metricsService.gauge(PublishMetrics.PUB_COMPONENT + 
".subscriber_count;pub_name=" + pubAgentName,
-                subscriberCountCallback);
-        
+        return metricsService.counter(getMetricName(QUEUE_ACCESS_ERROR_COUNT, 
tags));
     }
 
-    private String getMetricName(String name) {
-        return format("%s.%s", PUB_COMPONENT, name);
+    public void subscriberCount(Supplier<Integer> subscriberCountCallback) {
+        metricsService.gauge(getMetricName(SUBSCRIBER_COUNT, tags), 
subscriberCountCallback);
     }
 
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index f9fc84f..ff33c6f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -52,6 +52,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -119,7 +120,7 @@ public class DistributionSubscriber {
     private Precondition precondition;
 
     @Reference
-    private SubscriberMetrics subscriberMetrics;
+    private MetricsService metricsService;
 
     @Reference
     BookKeeperFactory bookKeeperFactory;
@@ -127,6 +128,8 @@ public class DistributionSubscriber {
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
 
+    private SubscriberMetrics subscriberMetrics;
+
     private volatile Closeable idleReadyCheck; // NOSONAR
 
     private volatile IdleCheck idleCheck; // NOSONAR
@@ -162,12 +165,15 @@ public class DistributionSubscriber {
         subAgentName = requireNotBlank(config.name());
         requireNonNull(config);
         requireNonNull(context);
+        requireNonNull(metricsService);
         requireNonNull(packageBuilder);
         requireNonNull(slingSettings);
         requireNonNull(messagingProvider);
         requireNonNull(topics);
         requireNonNull(precondition);
         requireNonNull(bookKeeperFactory);
+        
+        this.subscriberMetrics = new SubscriberMetrics(metricsService, 
subAgentName, config.editable());
 
         if (config.editable()) {
             commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, delay::signal);
@@ -197,7 +203,7 @@ public class DistributionSubscriber {
                 config.packageHandling(),
                 packageNodeName,
                 
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
-        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender);
+        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender, this.subscriberMetrics);
 
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
index 0c52dd7..b34f294 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.metrics;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,4 +38,8 @@ public final class TaggedMetrics {
         log.debug("metric={}", metric);
         return metric;
     }
+    
+    public static String getMetricName(String metricName, Tag tag) {
+        return getMetricName(metricName, Collections.singletonList(tag));
+    }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index bf72a26..2663457 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -88,9 +88,8 @@ public class BookKeeperTest {
 
     @Before
     public void before() {
-        subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
-
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract, "package", true);
+        subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP, 
bkConfig.getSubAgentName(), bkConfig.isEditable());
         bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, 
packageHandler, eventAdmin, sender, logSender, bkConfig,
                 importPreProcessor, importPostProcessor, 
invalidationProcessor);
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index d7541e9..206cb29 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.internal.MetricsServiceImpl;
 import org.apache.sling.distribution.DistributionRequest;
@@ -108,8 +107,6 @@ public class DistributionPublisherTest {
     @Mock
     private DistributionPackageBuilder packageBuilder;
 
-    private PublishMetrics publishMetrics;
-
     private OsgiContext context = new OsgiContext();
 
     private DistributionPublisher publisher;
@@ -128,11 +125,12 @@ public class DistributionPublisherTest {
 
     @Spy
     private Topics topics = new Topics();
+    
+    private MetricsService metricsService;
 
     @Before
     public void before() throws Exception {
-        MetricsService metricsService = 
context.registerInjectActivateService(MetricsServiceImpl.class);
-        publishMetrics = new PublishMetrics(metricsService);
+        metricsService = 
context.registerInjectActivateService(MetricsServiceImpl.class);
         when(packageBuilder.getType()).thenReturn("journal");
         Map<String, String> props = Map.of("name", PUB1AGENT1,
                 "maxQueueSizeDelay", "1000");
@@ -141,7 +139,7 @@ public class DistributionPublisherTest {
         BundleContext bcontext = context.bundleContext();
         
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, 
packageBuilder, discoveryService, factory,
-                eventAdmin, topics, publishMetrics, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
+                eventAdmin, topics, metricsService, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
@@ -250,8 +248,8 @@ public class DistributionPublisherTest {
 
         DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
         assertNull(queue);
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong queue counter expected",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong queue counter expected",1, count);
     }
 
     @Test
@@ -264,8 +262,12 @@ public class DistributionPublisherTest {
             fail("Expected exception not thrown");
         } catch (RuntimeException expectedException) {
         }
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong getQueue error counter",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong getQueue error counter",1, count);
+    }
+
+    private long getQueueAccessErrorCount() {
+        return new PublishMetrics(metricsService, 
PUB1AGENT1).getQueueAccessErrorCount().getCount();
     }
 
     @Test(expected = DistributionException.class)
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index c73b4b5..56ab9fc 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -74,7 +74,6 @@ import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
 import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
-import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
 import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
@@ -113,7 +112,6 @@ import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
-@SuppressWarnings("unchecked")
 @RunWith(MockitoJUnitRunner.class)
 public class SubscriberTest {
 
@@ -180,7 +178,7 @@ public class SubscriberTest {
     private MessageSender<PackageStatusMessage> statusSender;
 
     @Spy
-    private SubscriberMetrics subscriberMetrics = new 
SubscriberMetrics(MetricsService.NOOP);
+    private MetricsService metricsService = MetricsService.NOOP;
 
     @Spy
     private ImportPreProcessor importPreProcessor = new 
NoOpImportPreProcessor();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
index 85eb95c..a3f0878 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
@@ -42,7 +42,7 @@ public class SubscriberMetricsTest {
     @Before
     public void before() {
         MetricsService metricsService = MetricsService.NOOP;
-        metrics = new SubscriberMetrics(metricsService);
+        metrics = new SubscriberMetrics(metricsService, "subAgentName", true);
     }
 
     public static void mockBehaviour(MetricsService metricsService) {


Reply via email to