This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new 14854a4 SLING-12292 - Add tags to publish metrics 14854a4 is described below commit 14854a40ebc8dc206ed45c36f97b13234b3156d3 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Tue Apr 16 11:55:49 2024 +0200 SLING-12292 - Add tags to publish metrics --- .../journal/impl/discovery/DiscoveryService.java | 4 ++ .../impl/publisher/DistributionPublisher.java | 13 ++-- .../journal/impl/publisher/PublishMetrics.java | 76 ++++++++-------------- .../impl/publisher/DistributionPublisherTest.java | 22 ++++--- 4 files changed, 51 insertions(+), 64 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java index ef76bd1..b624d80 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java @@ -134,6 +134,10 @@ public class DiscoveryService implements Runnable { public TopologyView getTopologyView() { return viewManager.getCurrentView(); } + + public int getSubscriberCount(String pubAgentName) { + return getTopologyView().getSubscribedAgentIds(pubAgentName).size(); + } @Override public void run() { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 2bf24f5..2adec23 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -46,6 +46,7 @@ import org.apache.sling.distribution.journal.shared.DistributionLogEventListener import org.apache.sling.distribution.journal.shared.Timed; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.commons.metrics.MetricsService; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestState; import org.apache.sling.distribution.DistributionResponse; @@ -123,7 +124,7 @@ public class DistributionPublisher implements DistributionAgent { @Reference Topics topics, @Reference - PublishMetrics publishMetrics, + MetricsService metricsService, @Reference PubQueueProvider pubQueueProvider, @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = OPTIONAL, policyOption = GREEDY) @@ -131,13 +132,15 @@ public class DistributionPublisher implements DistributionAgent { PublisherConfiguration config, BundleContext context) { + pubAgentName = requireNotBlank(config.name()); + this.packageBuilder = packageBuilder; this.factory = requireNonNull(factory); this.eventAdmin = eventAdmin; - this.publishMetrics = requireNonNull(publishMetrics); + requireNonNull(metricsService); + this.publishMetrics = new PublishMetrics(metricsService, pubAgentName); this.pubQueueProvider = pubQueueProvider; - pubAgentName = requireNotBlank(config.name()); distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO); distributionLogEventListener = new DistributionLogEventListener(context, distLog, pubAgentName); @@ -148,9 +151,7 @@ public class DistributionPublisher implements DistributionAgent { pkgType = packageBuilder.getType(); this.sender = messagingProvider.createSender(topics.getPackageTopic()); - publishMetrics.subscriberCount(pubAgentName, - () -> discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size() - ); + publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(pubAgentName)); distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}", pubAgentName, pkgType, limitEnabled, queuedTimeout, queueSizeLimit, maxQueueSizeDelay); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java index ce53518..ef64ad5 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java @@ -18,8 +18,10 @@ */ package org.apache.sling.distribution.journal.impl.publisher; -import static java.lang.String.format; +import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName; +import java.util.Arrays; +import java.util.List; import java.util.function.Supplier; import org.apache.sling.commons.metrics.Counter; @@ -27,43 +29,27 @@ import org.apache.sling.commons.metrics.Histogram; import org.apache.sling.commons.metrics.Meter; import org.apache.sling.commons.metrics.MetricsService; import org.apache.sling.commons.metrics.Timer; -import org.osgi.service.component.annotations.Activate; -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; +import org.apache.sling.distribution.journal.metrics.Tag; -@Component(service = PublishMetrics.class) public class PublishMetrics { - - public static final String BASE_COMPONENT = "distribution.journal"; - - public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher"; - + private static final String TAG_AGENT_NAME = "pub_name"; + + public static final String PUB_COMPONENT = "distribution.journal.publisher."; + private static final String EXPORTED_PACKAGE_SIZE = PUB_COMPONENT + "exported_package_size"; + private static final String ACCEPTED_REQUESTS = PUB_COMPONENT + "accepted_requests"; + private static final String DROPPED_REQUESTS = PUB_COMPONENT + "dropped_requests"; + private static final String BUILD_PACKAGE_DURATION = PUB_COMPONENT + "build_package_duration"; + private static final String ENQUEUE_PACKAGE_DURATION = PUB_COMPONENT + "enqueue_package_duration"; + private static final String QUEUE_CACHE_FETCH_COUNT = PUB_COMPONENT + "queue_cache_fetch_count"; + private static final String QUEUE_ACCESS_ERROR_COUNT = PUB_COMPONENT + "queue_access_error_count"; + private static final String SUBSCRIBER_COUNT = PUB_COMPONENT + "subscriber_count"; + + private final List<Tag> tags; private final MetricsService metricsService; - private final Histogram exportedPackageSize; - - private final Meter acceptedRequests; - - private final Meter droppedRequests; - - private final Timer buildPackageDuration; - - private final Timer enqueuePackageDuration; - - private final Counter queueCacheFetchCount; - - private final Counter queueAccessErrorCount; - - @Activate - public PublishMetrics(@Reference MetricsService metricsService) { + public PublishMetrics(MetricsService metricsService, String pubAgentName) { + this.tags = Arrays.asList(Tag.of(TAG_AGENT_NAME, pubAgentName)); this.metricsService = metricsService; - exportedPackageSize = metricsService.histogram(getMetricName("exported_package_size")); - acceptedRequests = metricsService.meter(getMetricName("accepted_requests")); - droppedRequests = metricsService.meter(getMetricName("dropped_requests")); - buildPackageDuration = metricsService.timer(getMetricName("build_package_duration")); - enqueuePackageDuration = metricsService.timer(getMetricName("enqueue_package_duration")); - queueCacheFetchCount = metricsService.counter(getMetricName("queue_cache_fetch_count")); - queueAccessErrorCount = metricsService.counter(getMetricName("queue_access_error_count")); } /** @@ -72,7 +58,7 @@ public class PublishMetrics { * @return a Sling Metrics histogram */ public Histogram getExportedPackageSize() { - return exportedPackageSize; + return metricsService.histogram(getMetricName(EXPORTED_PACKAGE_SIZE, tags)); } /** @@ -81,7 +67,7 @@ public class PublishMetrics { * @return a Sling Metrics meter */ public Meter getAcceptedRequests() { - return acceptedRequests; + return metricsService.meter(getMetricName(ACCEPTED_REQUESTS, tags)); } /** @@ -90,7 +76,7 @@ public class PublishMetrics { * @return a Sling Metrics meter */ public Meter getDroppedRequests() { - return droppedRequests; + return metricsService.meter(getMetricName(DROPPED_REQUESTS, tags)); } /** @@ -99,7 +85,7 @@ public class PublishMetrics { * @return a Sling Metric timer */ public Timer getBuildPackageDuration() { - return buildPackageDuration; + return metricsService.timer(getMetricName(BUILD_PACKAGE_DURATION, tags)); } /** @@ -108,7 +94,7 @@ public class PublishMetrics { * @return a Sling Metric timer */ public Timer getEnqueuePackageDuration() { - return enqueuePackageDuration; + return metricsService.timer(getMetricName(ENQUEUE_PACKAGE_DURATION, tags)); } /** @@ -117,7 +103,7 @@ public class PublishMetrics { * @return a Sling Metric counter */ public Counter getQueueCacheFetchCount() { - return queueCacheFetchCount; + return metricsService.counter(getMetricName(QUEUE_CACHE_FETCH_COUNT, tags)); } /** @@ -126,17 +112,11 @@ public class PublishMetrics { * @return a Sling Metric counter */ public Counter getQueueAccessErrorCount() { - return queueAccessErrorCount; - } - - public void subscriberCount(String pubAgentName, Supplier<Integer> subscriberCountCallback) { - metricsService.gauge(PublishMetrics.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName, - subscriberCountCallback); - + return metricsService.counter(getMetricName(QUEUE_ACCESS_ERROR_COUNT, tags)); } - private String getMetricName(String name) { - return format("%s.%s", PUB_COMPONENT, name); + public void subscriberCount(Supplier<Integer> subscriberCountCallback) { + metricsService.gauge(getMetricName(SUBSCRIBER_COUNT, tags), subscriberCountCallback); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index d7541e9..206cb29 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.time.StopWatch; import org.apache.sling.api.resource.ResourceResolver; -import org.apache.sling.commons.metrics.Counter; import org.apache.sling.commons.metrics.MetricsService; import org.apache.sling.commons.metrics.internal.MetricsServiceImpl; import org.apache.sling.distribution.DistributionRequest; @@ -108,8 +107,6 @@ public class DistributionPublisherTest { @Mock private DistributionPackageBuilder packageBuilder; - private PublishMetrics publishMetrics; - private OsgiContext context = new OsgiContext(); private DistributionPublisher publisher; @@ -128,11 +125,12 @@ public class DistributionPublisherTest { @Spy private Topics topics = new Topics(); + + private MetricsService metricsService; @Before public void before() throws Exception { - MetricsService metricsService = context.registerInjectActivateService(MetricsServiceImpl.class); - publishMetrics = new PublishMetrics(metricsService); + metricsService = context.registerInjectActivateService(MetricsServiceImpl.class); when(packageBuilder.getType()).thenReturn("journal"); Map<String, String> props = Map.of("name", PUB1AGENT1, "maxQueueSizeDelay", "1000"); @@ -141,7 +139,7 @@ public class DistributionPublisherTest { BundleContext bcontext = context.bundleContext(); when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender); publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory, - eventAdmin, topics, publishMetrics, pubQueueProvider, Condition.INSTANCE, config, bcontext); + eventAdmin, topics, metricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext); when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier); } @@ -250,8 +248,8 @@ public class DistributionPublisherTest { DistributionQueue queue = publisher.getQueue("i_am_not_a_queue"); assertNull(queue); - Counter counter = publishMetrics.getQueueAccessErrorCount(); - assertEquals("Wrong queue counter expected",1, counter.getCount()); + long count = getQueueAccessErrorCount(); + assertEquals("Wrong queue counter expected",1, count); } @Test @@ -264,8 +262,12 @@ public class DistributionPublisherTest { fail("Expected exception not thrown"); } catch (RuntimeException expectedException) { } - Counter counter = publishMetrics.getQueueAccessErrorCount(); - assertEquals("Wrong getQueue error counter",1, counter.getCount()); + long count = getQueueAccessErrorCount(); + assertEquals("Wrong getQueue error counter",1, count); + } + + private long getQueueAccessErrorCount() { + return new PublishMetrics(metricsService, PUB1AGENT1).getQueueAccessErrorCount().getCount(); } @Test(expected = DistributionException.class)