This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 14854a4  SLING-12292 - Add tags to publish metrics
14854a4 is described below

commit 14854a40ebc8dc206ed45c36f97b13234b3156d3
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Tue Apr 16 11:55:49 2024 +0200

    SLING-12292 - Add tags to publish metrics
---
 .../journal/impl/discovery/DiscoveryService.java   |  4 ++
 .../impl/publisher/DistributionPublisher.java      | 13 ++--
 .../journal/impl/publisher/PublishMetrics.java     | 76 ++++++++--------------
 .../impl/publisher/DistributionPublisherTest.java  | 22 ++++---
 4 files changed, 51 insertions(+), 64 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index ef76bd1..b624d80 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -134,6 +134,10 @@ public class DiscoveryService implements Runnable {
     public TopologyView getTopologyView() {
         return viewManager.getCurrentView();
     }
+    
+    public int getSubscriberCount(String pubAgentName) {
+        return getTopologyView().getSubscribedAgentIds(pubAgentName).size();
+    }
 
     @Override
     public void run() {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2bf24f5..2adec23 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -46,6 +46,7 @@ import 
org.apache.sling.distribution.journal.shared.DistributionLogEventListener
 import org.apache.sling.distribution.journal.shared.Timed;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionResponse;
@@ -123,7 +124,7 @@ public class DistributionPublisher implements 
DistributionAgent {
             @Reference
             Topics topics,
             @Reference
-            PublishMetrics publishMetrics,
+            MetricsService metricsService,
             @Reference
             PubQueueProvider pubQueueProvider,
             @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", 
cardinality = OPTIONAL, policyOption = GREEDY)
@@ -131,13 +132,15 @@ public class DistributionPublisher implements 
DistributionAgent {
             PublisherConfiguration config,
             BundleContext context) {
 
+        pubAgentName = requireNotBlank(config.name());
+
         this.packageBuilder = packageBuilder;
         this.factory = requireNonNull(factory);
         this.eventAdmin = eventAdmin;
-        this.publishMetrics = requireNonNull(publishMetrics);
+        requireNonNull(metricsService);
+        this.publishMetrics = new PublishMetrics(metricsService, pubAgentName);
         this.pubQueueProvider = pubQueueProvider;
 
-        pubAgentName = requireNotBlank(config.name());
         distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
         distributionLogEventListener = new 
DistributionLogEventListener(context, distLog, pubAgentName);
 
@@ -148,9 +151,7 @@ public class DistributionPublisher implements 
DistributionAgent {
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
-        publishMetrics.subscriberCount(pubAgentName,
-                () -> 
discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
-        );
+        publishMetrics.subscriberCount(() -> 
discoveryService.getSubscriberCount(pubAgentName));
         
         distLog.info("Started Publisher agent={} with packageBuilder={}, 
limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
                 pubAgentName, pkgType, limitEnabled, queuedTimeout, 
queueSizeLimit, maxQueueSizeDelay);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
index ce53518..ef64ad5 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static java.lang.String.format;
+import static 
org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -27,43 +29,27 @@ import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
 
-@Component(service = PublishMetrics.class)
 public class PublishMetrics {
-
-    public static final String BASE_COMPONENT = "distribution.journal";
-
-    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
+    private static final String TAG_AGENT_NAME = "pub_name";
+
+    public static final String PUB_COMPONENT = 
"distribution.journal.publisher.";
+    private static final String EXPORTED_PACKAGE_SIZE = PUB_COMPONENT + 
"exported_package_size";
+    private static final String ACCEPTED_REQUESTS = PUB_COMPONENT + 
"accepted_requests";
+    private static final String DROPPED_REQUESTS = PUB_COMPONENT + 
"dropped_requests";
+    private static final String BUILD_PACKAGE_DURATION = PUB_COMPONENT + 
"build_package_duration";
+    private static final String ENQUEUE_PACKAGE_DURATION = PUB_COMPONENT + 
"enqueue_package_duration";
+    private static final String QUEUE_CACHE_FETCH_COUNT = PUB_COMPONENT + 
"queue_cache_fetch_count";
+    private static final String QUEUE_ACCESS_ERROR_COUNT = PUB_COMPONENT + 
"queue_access_error_count";
+    private static final String SUBSCRIBER_COUNT = PUB_COMPONENT + 
"subscriber_count";
+
+    private final List<Tag> tags;
     private final MetricsService metricsService;
 
-    private final  Histogram exportedPackageSize;
-
-    private final  Meter acceptedRequests;
-
-    private final  Meter droppedRequests;
-
-    private final  Timer buildPackageDuration;
-
-    private final  Timer enqueuePackageDuration;
-
-    private final  Counter queueCacheFetchCount;
-
-    private final  Counter queueAccessErrorCount;
-
-    @Activate
-    public PublishMetrics(@Reference MetricsService metricsService) {
+    public PublishMetrics(MetricsService metricsService, String pubAgentName) {
+        this.tags = Arrays.asList(Tag.of(TAG_AGENT_NAME, pubAgentName));
         this.metricsService = metricsService;
-        exportedPackageSize = 
metricsService.histogram(getMetricName("exported_package_size"));
-        acceptedRequests = 
metricsService.meter(getMetricName("accepted_requests"));
-        droppedRequests = 
metricsService.meter(getMetricName("dropped_requests"));
-        buildPackageDuration = 
metricsService.timer(getMetricName("build_package_duration"));
-        enqueuePackageDuration = 
metricsService.timer(getMetricName("enqueue_package_duration"));
-        queueCacheFetchCount = 
metricsService.counter(getMetricName("queue_cache_fetch_count"));
-        queueAccessErrorCount = 
metricsService.counter(getMetricName("queue_access_error_count"));
     }
 
     /**
@@ -72,7 +58,7 @@ public class PublishMetrics {
      * @return a Sling Metrics histogram
      */
     public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
+        return metricsService.histogram(getMetricName(EXPORTED_PACKAGE_SIZE, 
tags));
     }
 
     /**
@@ -81,7 +67,7 @@ public class PublishMetrics {
      * @return a Sling Metrics meter
      */
     public Meter getAcceptedRequests() {
-        return acceptedRequests;
+        return metricsService.meter(getMetricName(ACCEPTED_REQUESTS, tags));
     }
 
     /**
@@ -90,7 +76,7 @@ public class PublishMetrics {
      * @return a Sling Metrics meter
      */
     public Meter getDroppedRequests() {
-        return droppedRequests;
+        return metricsService.meter(getMetricName(DROPPED_REQUESTS, tags));
     }
 
     /**
@@ -99,7 +85,7 @@ public class PublishMetrics {
      * @return a Sling Metric timer
      */
     public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
+        return metricsService.timer(getMetricName(BUILD_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -108,7 +94,7 @@ public class PublishMetrics {
      * @return a Sling Metric timer
      */
     public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
+        return metricsService.timer(getMetricName(ENQUEUE_PACKAGE_DURATION, 
tags));
     }
 
     /**
@@ -117,7 +103,7 @@ public class PublishMetrics {
      * @return a Sling Metric counter
      */
     public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
+        return metricsService.counter(getMetricName(QUEUE_CACHE_FETCH_COUNT, 
tags));
     }
 
     /**
@@ -126,17 +112,11 @@ public class PublishMetrics {
      * @return a Sling Metric counter
      */
     public Counter getQueueAccessErrorCount() {
-        return queueAccessErrorCount;
-    }
-
-    public void subscriberCount(String pubAgentName, Supplier<Integer> 
subscriberCountCallback) {
-        metricsService.gauge(PublishMetrics.PUB_COMPONENT + 
".subscriber_count;pub_name=" + pubAgentName,
-                subscriberCountCallback);
-        
+        return metricsService.counter(getMetricName(QUEUE_ACCESS_ERROR_COUNT, 
tags));
     }
 
-    private String getMetricName(String name) {
-        return format("%s.%s", PUB_COMPONENT, name);
+    public void subscriberCount(Supplier<Integer> subscriberCountCallback) {
+        metricsService.gauge(getMetricName(SUBSCRIBER_COUNT, tags), 
subscriberCountCallback);
     }
 
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index d7541e9..206cb29 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.internal.MetricsServiceImpl;
 import org.apache.sling.distribution.DistributionRequest;
@@ -108,8 +107,6 @@ public class DistributionPublisherTest {
     @Mock
     private DistributionPackageBuilder packageBuilder;
 
-    private PublishMetrics publishMetrics;
-
     private OsgiContext context = new OsgiContext();
 
     private DistributionPublisher publisher;
@@ -128,11 +125,12 @@ public class DistributionPublisherTest {
 
     @Spy
     private Topics topics = new Topics();
+    
+    private MetricsService metricsService;
 
     @Before
     public void before() throws Exception {
-        MetricsService metricsService = 
context.registerInjectActivateService(MetricsServiceImpl.class);
-        publishMetrics = new PublishMetrics(metricsService);
+        metricsService = 
context.registerInjectActivateService(MetricsServiceImpl.class);
         when(packageBuilder.getType()).thenReturn("journal");
         Map<String, String> props = Map.of("name", PUB1AGENT1,
                 "maxQueueSizeDelay", "1000");
@@ -141,7 +139,7 @@ public class DistributionPublisherTest {
         BundleContext bcontext = context.bundleContext();
         
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, 
packageBuilder, discoveryService, factory,
-                eventAdmin, topics, publishMetrics, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
+                eventAdmin, topics, metricsService, pubQueueProvider, 
Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
@@ -250,8 +248,8 @@ public class DistributionPublisherTest {
 
         DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
         assertNull(queue);
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong queue counter expected",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong queue counter expected",1, count);
     }
 
     @Test
@@ -264,8 +262,12 @@ public class DistributionPublisherTest {
             fail("Expected exception not thrown");
         } catch (RuntimeException expectedException) {
         }
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong getQueue error counter",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong getQueue error counter",1, count);
+    }
+
+    private long getQueueAccessErrorCount() {
+        return new PublishMetrics(metricsService, 
PUB1AGENT1).getQueueAccessErrorCount().getCount();
     }
 
     @Test(expected = DistributionException.class)

Reply via email to