This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 5c2aa91f7494af3378866f032176e2f1a7d64766 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Fri Apr 12 14:47:44 2024 +0200 SLING-12292 - Split metrics into publisher and subscriber --- .../journal/bookkeeper/BookKeeper.java | 55 +++--- .../journal/bookkeeper/BookKeeperFactory.java | 6 +- .../impl/publisher/DistributionPublisher.java | 29 ++-- .../impl/publisher/MessagingCacheCallback.java | 10 +- .../impl/publisher/PubQueueProviderPublisher.java | 6 +- .../impl/subscriber/DistributionSubscriber.java | 14 +- .../journal/shared/PublishMetrics.java | 188 +++++++++++++++++++++ ...nMetricsService.java => SubscriberMetrics.java} | 103 ++--------- .../journal/bookkeeper/BookKeeperTest.java | 8 +- .../impl/publisher/DistributionPublisherTest.java | 12 +- .../impl/publisher/MessagingCacheCallbackTest.java | 8 +- .../journal/impl/subscriber/SubscriberTest.java | 4 +- ...ServiceTest.java => SubscriberMetricsTest.java} | 14 +- 13 files changed, 274 insertions(+), 183 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 2bb1a84..2a2b023 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -49,7 +49,7 @@ import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.SubscriberMetrics; import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor; import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor; import org.osgi.service.event.Event; @@ -88,7 +88,7 @@ public class BookKeeper { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final ResourceResolverFactory resolverFactory; - private final DistributionMetricsService distributionMetricsService; + private final SubscriberMetrics subscriberMetrics; private final PackageHandler packageHandler; private final EventAdmin eventAdmin; private final Consumer<PackageStatusMessage> sender; @@ -103,14 +103,7 @@ public class BookKeeper { private final InvalidationProcessor invalidationProcessor; private int skippedCounter = 0; - public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, - PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, - BookKeeperConfig config) { - this(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, - logSender, config, new NoOpImportPostProcessor(), new NoOpInvalidationProcessor()); - } - - public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, + public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, BookKeeperConfig config, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) { this.packageHandler = packageHandler; @@ -118,10 +111,10 @@ public class BookKeeper { this.sender = sender; this.logSender = logSender; this.config = config; - String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.getSubAgentName(); - distributionMetricsService.createGauge(nameRetries, packageRetries::getSum); + + subscriberMetrics.currentRetries(config.getSubAgentName(), packageRetries::getSum); this.resolverFactory = resolverFactory; - this.distributionMetricsService = distributionMetricsService; + this.subscriberMetrics = subscriberMetrics; // Error queues are enabled when the number // of retry attempts is limited ; disabled otherwise this.errorQueueEnabled = (config.getMaxRetries() >= 0); @@ -150,7 +143,7 @@ public class BookKeeper { */ public void importPackage(PackageMessage pkgMsg, long offset, long createdTime) throws DistributionException { log.debug("Importing distribution package {} at offset={}", pkgMsg, offset); - try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time(); + try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time(); ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { packageHandler.apply(importerResolver, pkgMsg); if (config.isEditable()) { @@ -158,8 +151,8 @@ public class BookKeeper { } storeOffset(importerResolver, offset); importerResolver.commit(); - distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength()); - distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS); + subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength()); + subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS); // Execute the post-processor postProcess(pkgMsg); @@ -169,7 +162,7 @@ public class BookKeeper { Event event = new AppliedEvent(pkgMsg, config.getSubAgentName()).toEvent(); eventAdmin.postEvent(event); log.info("Imported distribution package {} at offset={}", pkgMsg, offset); - distributionMetricsService.getPackageStatusCounter(Status.IMPORTED.name()).increment(); + subscriberMetrics.getPackageStatusCounter(Status.IMPORTED.name()).increment(); } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPostProcessException e) { failure(pkgMsg, offset, e); } @@ -184,7 +177,7 @@ public class BookKeeper { props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId()); long invalidationStartTime = currentTimeMillis(); - distributionMetricsService.getInvalidationProcessRequest().increment(); + subscriberMetrics.getInvalidationProcessRequest().increment(); invalidationProcessor.process(props); @@ -202,9 +195,9 @@ public class BookKeeper { log.info("Invalidated the cache for the package {} at offset={}", pkgMsg, offset); - distributionMetricsService.getPackageStatusCounter(Status.IMPORTED.name()).increment(); - distributionMetricsService.getInvalidationProcessDuration().update((currentTimeMillis() - invalidationStartTime), TimeUnit.MILLISECONDS); - distributionMetricsService.getInvalidationProcessSuccess().increment(); + subscriberMetrics.getPackageStatusCounter(Status.IMPORTED.name()).increment(); + subscriberMetrics.getInvalidationProcessDuration().update((currentTimeMillis() - invalidationStartTime), TimeUnit.MILLISECONDS); + subscriberMetrics.getInvalidationProcessSuccess().increment(); } catch (LoginException | PersistenceException | InvalidationProcessException e) { failure(pkgMsg, offset, e); } @@ -219,13 +212,13 @@ public class BookKeeper { props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId()); long postProcessStartTime = currentTimeMillis(); - distributionMetricsService.getImportPostProcessRequest().increment(); + subscriberMetrics.getImportPostProcessRequest().increment(); importPostProcessor.process(props); log.debug("Executed import post processor for package [{}]", pkgMsg.getPkgId()); - distributionMetricsService.getImportPostProcessDuration().update((currentTimeMillis() - postProcessStartTime), TimeUnit.MILLISECONDS); - distributionMetricsService.getImportPostProcessSuccess().increment(); + subscriberMetrics.getImportPostProcessDuration().update((currentTimeMillis() - postProcessStartTime), TimeUnit.MILLISECONDS); + subscriberMetrics.getImportPostProcessSuccess().increment(); } /** @@ -238,7 +231,7 @@ public class BookKeeper { * @throws DistributionException if the package should be retried */ private void failure(PackageMessage pkgMsg, long offset, Exception e) throws DistributionException { - distributionMetricsService.getFailedPackageImports().mark(); + subscriberMetrics.getFailedPackageImports().mark(); String pubAgentName = pkgMsg.getPubAgentName(); int retries = packageRetries.get(pubAgentName); @@ -255,7 +248,7 @@ public class BookKeeper { if (giveUp) { log.warn(msg, e); removeFailedPackage(pkgMsg, offset); - distributionMetricsService.getPermanentImportErrors().increment(); + subscriberMetrics.getPermanentImportErrors().increment(); } else { packageRetries.increase(pubAgentName); throw new DistributionException(msg, e); @@ -278,7 +271,7 @@ public class BookKeeper { public void removePackage(PackageMessage pkgMsg, long offset) throws LoginException, PersistenceException { log.info("Removing distribution package {} of type {} at offset {}", pkgMsg.getPkgId(), pkgMsg.getReqType(), offset); - Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time(); + Timer.Context context = subscriberMetrics.getRemovedPackageDuration().time(); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { if (config.isEditable()) { storeStatus(resolver, new PackageStatus(Status.REMOVED, offset, pkgMsg.getPubAgentName())); @@ -288,7 +281,7 @@ public class BookKeeper { } packageRetries.clear(pkgMsg.getPubAgentName()); context.stop(); - distributionMetricsService.getPackageStatusCounter(Status.REMOVED.name()).increment(); + subscriberMetrics.getPackageStatusCounter(Status.REMOVED.name()).increment(); } public void skipPackage(long offset) throws LoginException, PersistenceException { @@ -373,7 +366,7 @@ public class BookKeeper { public void clearPackageRetriesOnSuccess(PackageMessage pkgMsg) { String pubAgentName = pkgMsg.getPubAgentName(); if (packageRetries.get(pubAgentName) > 0) { - distributionMetricsService.getTransientImportErrors().increment(); + subscriberMetrics.getTransientImportErrors().increment(); } packageRetries.clear(pubAgentName); @@ -395,7 +388,7 @@ public class BookKeeper { private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException { log.info("Removing failed distribution package {} at offset={}", pkgMsg, offset); - Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time(); + Timer.Context context = subscriberMetrics.getRemovedFailedPackageDuration().time(); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { storeStatus(resolver, new PackageStatus(Status.REMOVED_FAILED, offset, pkgMsg.getPubAgentName())); storeOffset(resolver, offset); @@ -404,7 +397,7 @@ public class BookKeeper { throw new DistributionException("Error removing failed package", e); } context.stop(); - distributionMetricsService.getPackageStatusCounter(Status.REMOVED_FAILED.name()).increment(); + subscriberMetrics.getPackageStatusCounter(Status.REMOVED_FAILED.name()).increment(); } private void storeStatus(ResourceResolver resolver, PackageStatus packageStatus) throws PersistenceException { diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java index b2df2ef..79ee33e 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java @@ -27,7 +27,7 @@ import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.BinaryStore; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.SubscriberMetrics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; @@ -39,7 +39,7 @@ public class BookKeeperFactory { private ResourceResolverFactory resolverFactory; @Reference - private DistributionMetricsService distributionMetricsService; + private SubscriberMetrics subscriberMetrics; @Reference private EventAdmin eventAdmin; @@ -69,7 +69,7 @@ public class BookKeeperFactory { PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor, binaryStore); return new BookKeeper( resolverFactory, - distributionMetricsService, + subscriberMetrics, packageHandler, eventAdmin, statusSender, diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index a2a8874..ae1dc4a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -23,7 +23,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.sling.distribution.DistributionRequestState.ACCEPTED; import static org.apache.sling.distribution.DistributionRequestType.*; -import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed; +import static org.apache.sling.distribution.journal.shared.SubscriberMetrics.timed; import static org.apache.sling.distribution.journal.shared.Strings.requireNotBlank; import static org.osgi.service.component.annotations.ReferenceCardinality.OPTIONAL; import static org.osgi.service.component.annotations.ReferencePolicyOption.GREEDY; @@ -44,7 +44,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.shared.DefaultDistributionLog; import org.apache.sling.distribution.journal.shared.DistributionLogEventListener; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublishMetrics; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.DistributionRequest; @@ -89,7 +89,7 @@ public class DistributionPublisher implements DistributionAgent { private final EventAdmin eventAdmin; - private final DistributionMetricsService distributionMetricsService; + private final PublishMetrics publishMetrics; private final PubQueueProvider pubQueueProvider; @@ -124,7 +124,7 @@ public class DistributionPublisher implements DistributionAgent { @Reference Topics topics, @Reference - DistributionMetricsService distributionMetricsService, + PublishMetrics publishMetrics, @Reference PubQueueProvider pubQueueProvider, @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = OPTIONAL, policyOption = GREEDY) @@ -135,7 +135,7 @@ public class DistributionPublisher implements DistributionAgent { this.packageBuilder = packageBuilder; this.factory = requireNonNull(factory); this.eventAdmin = eventAdmin; - this.distributionMetricsService = requireNonNull(distributionMetricsService); + this.publishMetrics = requireNonNull(publishMetrics); this.pubQueueProvider = pubQueueProvider; pubAgentName = requireNotBlank(config.name()); @@ -149,8 +149,7 @@ public class DistributionPublisher implements DistributionAgent { pkgType = packageBuilder.getType(); this.sender = messagingProvider.createSender(topics.getPackageTopic()); - distributionMetricsService.createGauge( - DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName, + publishMetrics.subscriberCount(pubAgentName, () -> discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size() ); @@ -181,11 +180,11 @@ public class DistributionPublisher implements DistributionAgent { try { DistributionQueue queue = pubQueueProvider.getQueue(pubAgentName, queueName); if (queue == null) { - distributionMetricsService.getQueueAccessErrorCount().increment(); + publishMetrics.getQueueAccessErrorCount().increment(); } return queue; } catch (Exception e) { - distributionMetricsService.getQueueAccessErrorCount().increment(); + publishMetrics.getQueueAccessErrorCount().increment(); throw e; } } @@ -247,9 +246,9 @@ public class DistributionPublisher implements DistributionAgent { if (request.getRequestType() != TEST && request.getPaths().length == 0) { throw new DistributionException("Empty paths are not allowed"); } - return timed(distributionMetricsService.getBuildPackageDuration(), () -> factory.create(packageBuilder, resourceResolver, pubAgentName, request)); + return timed(publishMetrics.getBuildPackageDuration(), () -> factory.create(packageBuilder, resourceResolver, pubAgentName, request)); } catch (Exception e) { - distributionMetricsService.getDroppedRequests().mark(); + publishMetrics.getDroppedRequests().mark(); String msg = format("Failed to create content package for requestType=%s, paths=%s. Error=%s", request.getRequestType(), Arrays.toString(request.getPaths()), e.getMessage()); distLog.error(msg, e); @@ -260,14 +259,14 @@ public class DistributionPublisher implements DistributionAgent { @Nonnull private DistributionResponse send(final PackageMessage pkg, int queueSize, int delayMS) throws DistributionException { try { - long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> this.sendAndWait(pkg)); - distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength()); - distributionMetricsService.getAcceptedRequests().mark(); + long offset = timed(publishMetrics.getEnqueuePackageDuration(), () -> this.sendAndWait(pkg)); + publishMetrics.getExportedPackageSize().update(pkg.getPkgLength()); + publishMetrics.getAcceptedRequests().mark(); String msg = format("Request accepted with distribution package %s at offset=%d, queueSize=%d, queueSizeDelay=%d", pkg, offset, queueSize, delayMS); distLog.info(msg); return new SimpleDistributionResponse(ACCEPTED, msg, pkg::getPkgId); } catch (Throwable e) { - distributionMetricsService.getDroppedRequests().mark(); + publishMetrics.getDroppedRequests().mark(); String msg = format("Failed to append distribution package %s to the journal", pkg); distLog.error(msg, e); if (e instanceof Error) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java index 9f5f84d..23e8df7 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java @@ -39,7 +39,7 @@ import org.apache.sling.distribution.journal.queue.CacheCallback; import org.apache.sling.distribution.journal.queue.ClearCallback; import org.apache.sling.distribution.journal.queue.QueueState; import org.apache.sling.distribution.journal.shared.AgentId; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublishMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class MessagingCacheCallback implements CacheCallback { private final String packageTopic; - private final DistributionMetricsService distributionMetricsService; + private final PublishMetrics publishMetrics; private final DiscoveryService discoveryService; @@ -59,12 +59,12 @@ public class MessagingCacheCallback implements CacheCallback { public MessagingCacheCallback( MessagingProvider messagingProvider, String packageTopic, - DistributionMetricsService distributionMetricsService, + PublishMetrics publishMetrics, DiscoveryService discoveryService, Consumer<ClearCommand> commandSender) { this.messagingProvider = messagingProvider; this.packageTopic = packageTopic; - this.distributionMetricsService = distributionMetricsService; + this.publishMetrics = publishMetrics; this.discoveryService = discoveryService; this.commandSender = commandSender; } @@ -84,7 +84,7 @@ public class MessagingCacheCallback implements CacheCallback { @Override public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException { - distributionMetricsService.getQueueCacheFetchCount().increment(); + publishMetrics.getQueueCacheFetchCount().increment(); return new RangePoller(messagingProvider, packageTopic, minOffset, maxOffset, RangePoller.DEFAULT_SEED_DELAY_SECONDS) .fetchRange(); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java index bc44823..7f64427 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PubQueueProviderPublisher.java @@ -32,7 +32,7 @@ import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.queue.CacheCallback; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.queue.PubQueueProviderFactory; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublishMetrics; import org.apache.sling.distribution.journal.shared.Topics; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; @@ -58,7 +58,7 @@ public class PubQueueProviderPublisher { private Topics topics; @Reference - private DistributionMetricsService distributionMetricsService; + private PublishMetrics publishMetrics; @Reference private PubQueueProviderFactory pubQueueProviderFactory; @@ -75,7 +75,7 @@ public class PubQueueProviderPublisher { CacheCallback callback = new MessagingCacheCallback( messagingProvider, topics.getPackageTopic(), - distributionMetricsService, + publishMetrics, discoveryService, commandSender); this.pubQueueProvider = pubQueueProviderFactory.create(callback); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 9f805f6..d09a8e7 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -71,7 +71,7 @@ import org.apache.sling.distribution.journal.messages.OffsetMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.shared.Delay; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.SubscriberMetrics; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.settings.SlingSettingsService; @@ -119,7 +119,7 @@ public class DistributionSubscriber { private Precondition precondition; @Reference - private DistributionMetricsService distributionMetricsService; + private SubscriberMetrics subscriberMetrics; @Reference BookKeeperFactory bookKeeperFactory; @@ -270,7 +270,7 @@ public class DistributionSubscriber { private void handlePackageMessage(MessageInfo info, PackageMessage message) { if (shouldEnqueue(info, message)) { - distributionMetricsService.getPackageJournalDistributionDuration() + subscriberMetrics.getPackageJournalDistributionDuration() .update((currentTimeMillis() - info.getCreateTime()), TimeUnit.MILLISECONDS); enqueue(new FullMessage<>(info, message)); } else { @@ -307,7 +307,7 @@ public class DistributionSubscriber { try { while (running) { if (messageBuffer.offer(message, 1000, TimeUnit.MILLISECONDS)) { - distributionMetricsService.getItemsBufferSize().increment(); + subscriberMetrics.getItemsBufferSize().increment(); return; } } @@ -342,10 +342,10 @@ public class DistributionSubscriber { DistributionException, ImportPostProcessException { blockingSendStoredStatus(); FullMessage<PackageMessage> item = blockingPeekQueueItem(); - try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) { + try (Timer.Context context = subscriberMetrics.getProcessQueueItemDuration().time()) { processQueueItem(item); messageBuffer.remove(); - distributionMetricsService.getItemsBufferSize().decrement(); + subscriberMetrics.getItemsBufferSize().decrement(); catchAllDelay = catchAllDelays.get(); } } @@ -354,7 +354,7 @@ public class DistributionSubscriber { * Send status stored in a previous run if exists */ private void blockingSendStoredStatus() throws InterruptedException, IOException { - try (Timer.Context context = distributionMetricsService.getSendStoredStatusDuration().time()) { + try (Timer.Context context = subscriberMetrics.getSendStoredStatusDuration().time()) { int retry = 0; while (running) { if (bookKeeper.sendStoredStatus(retry)) { diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java b/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java new file mode 100644 index 0000000..430c8cb --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/shared/PublishMetrics.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import static java.lang.String.format; + +import java.util.function.Supplier; + +import org.apache.sling.commons.metrics.Counter; +import org.apache.sling.commons.metrics.Gauge; +import org.apache.sling.commons.metrics.Histogram; +import org.apache.sling.commons.metrics.Meter; +import org.apache.sling.commons.metrics.MetricsService; +import org.apache.sling.commons.metrics.Timer; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +@Component(service = PublishMetrics.class) +public class PublishMetrics { + + public static final String BASE_COMPONENT = "distribution.journal"; + + public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher"; + + private final MetricsService metricsService; + + private final Histogram exportedPackageSize; + + private final Meter acceptedRequests; + + private final Meter droppedRequests; + + private final Timer buildPackageDuration; + + private final Timer enqueuePackageDuration; + + private final Counter queueCacheFetchCount; + + private final Counter queueAccessErrorCount; + + @Activate + public PublishMetrics(@Reference MetricsService metricsService) { + this.metricsService = metricsService; + exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size")); + acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests")); + droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests")); + buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration")); + enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration")); + queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count")); + queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, "queue_access_error_count")); + } + + /** + * Histogram of the exported content package size in Bytes. + * + * @return a Sling Metrics histogram + */ + public Histogram getExportedPackageSize() { + return exportedPackageSize; + } + + /** + * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state. + * + * @return a Sling Metrics meter + */ + public Meter getAcceptedRequests() { + return acceptedRequests; + } + + /** + * Meter of requests returning an {@code DistributionRequestState.DROPPED} state. + * + * @return a Sling Metrics meter + */ + public Meter getDroppedRequests() { + return droppedRequests; + } + + /** + * Timer capturing the duration in ms of building a content package + * + * @return a Sling Metric timer + */ + public Timer getBuildPackageDuration() { + return buildPackageDuration; + } + + /** + * Timer capturing the duration in ms of adding a package to the queue + * + * @return a Sling Metric timer + */ + public Timer getEnqueuePackageDuration() { + return enqueuePackageDuration; + } + + /** + * Counter of fetch operations to feed the queue cache. + * + * @return a Sling Metric counter + */ + public Counter getQueueCacheFetchCount() { + return queueCacheFetchCount; + } + + /** + * Counter of queue access errors. + * + * @return a Sling Metric counter + */ + public Counter getQueueAccessErrorCount() { + return queueAccessErrorCount; + } + + /** + * Counter of journal error codes. + * + * @return a Sling Metric counter + */ + public Counter getJournalErrorCodeCount(String errorCode) { + return getCounter( + getNameWithLabel(getMetricName(BASE_COMPONENT, "journal_unavailable_error_code_count"), "error_code", errorCode)); + } + + /** + * Counter for all the different package status. + * + * @return a Sling Metric counter + */ + public Counter getPackageStatusCounter(String status) { + return getCounter( + getNameWithLabel(getMetricName(BASE_COMPONENT, "package_status_count"), "status", status) + ); + } + + public void subscriberCount(String pubAgentName, Supplier<Integer> subscriberCountCallback) { + createGauge(PublishMetrics.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName, + subscriberCountCallback); + + } + + private <T> Gauge<T> createGauge(String name, Supplier<T> supplier) { + return metricsService.gauge(name, supplier); + } + + private String getMetricName(String component, String name) { + return format("%s.%s", component, name); + } + + private String getNameWithLabel(String name, String label, String labelVal) { + return format("%s;%s=%s", name, label, labelVal); + } + + private Counter getCounter(String metricName) { + return metricsService.counter(metricName); + } + + private Timer getTimer(String metricName) { + return metricsService.timer(metricName); + } + + private Histogram getHistogram(String metricName) { + return metricsService.histogram(metricName); + } + + private Meter getMeter(String metricName) { + return metricsService.meter(metricName); + } + +} diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java similarity index 77% rename from src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java rename to src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java index c77e37f..b0d656c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java +++ b/src/main/java/org/apache/sling/distribution/journal/shared/SubscriberMetrics.java @@ -33,25 +33,17 @@ import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; -@Component(service = DistributionMetricsService.class) -public class DistributionMetricsService { +@Component(service = SubscriberMetrics.class) +public class SubscriberMetrics { public static final String BASE_COMPONENT = "distribution.journal"; - public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher"; - public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber"; private final MetricsService metricsService; private final Histogram importedPackageSize; - private final Histogram exportedPackageSize; - - private final Meter acceptedRequests; - - private final Meter droppedRequests; - private final Counter itemsBufferSize; private final Timer removedPackageDuration; @@ -70,14 +62,6 @@ public class DistributionMetricsService { private final Timer packageJournalDistributionDuration; - private final Timer buildPackageDuration; - - private final Timer enqueuePackageDuration; - - private final Counter queueCacheFetchCount; - - private final Counter queueAccessErrorCount; - private final Timer importPostProcessDuration; private final Counter importPostProcessSuccess; @@ -95,14 +79,8 @@ public class DistributionMetricsService { private final Counter permanentImportErrors; @Activate - public DistributionMetricsService(@Reference MetricsService metricsService) { + public SubscriberMetrics(@Reference MetricsService metricsService) { this.metricsService = metricsService; - exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size")); - acceptedRequests = getMeter(getMetricName(PUB_COMPONENT, "accepted_requests")); - droppedRequests = getMeter(getMetricName(PUB_COMPONENT, "dropped_requests")); - buildPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "build_package_duration")); - enqueuePackageDuration = getTimer(getMetricName(PUB_COMPONENT, "enqueue_package_duration")); - queueCacheFetchCount = getCounter(getMetricName(PUB_COMPONENT, "queue_cache_fetch_count")); importedPackageSize = getHistogram(getMetricName(SUB_COMPONENT, "imported_package_size")); itemsBufferSize = getCounter(getMetricName(SUB_COMPONENT, "items_buffer_size")); importedPackageDuration = getTimer(getMetricName(SUB_COMPONENT, "imported_package_duration")); @@ -113,11 +91,10 @@ public class DistributionMetricsService { processQueueItemDuration = getTimer(getMetricName(SUB_COMPONENT, "process_queue_item_duration")); packageDistributedDuration = getTimer(getMetricName(SUB_COMPONENT, "request_distributed_duration")); packageJournalDistributionDuration = getTimer(getMetricName(SUB_COMPONENT, "package_journal_distribution_duration")); - queueAccessErrorCount = getCounter(getMetricName(PUB_COMPONENT, "queue_access_error_count")); - importPostProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "import_post_process_duration")); + importPostProcessDuration = getTimer(getMetricName(SUB_COMPONENT, "import_post_process_duration")); importPostProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_success_count")); importPostProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "import_post_process_request_count")); - invalidationProcessDuration = getTimer(getMetricName(PUB_COMPONENT, "invalidation_process_duration")); + invalidationProcessDuration = getTimer(getMetricName(SUB_COMPONENT, "invalidation_process_duration")); invalidationProcessSuccess = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_success_count")); invalidationProcessRequest = getCounter(getMetricName(SUB_COMPONENT, "invalidation_process_request_count")); transientImportErrors = getCounter(getMetricName(SUB_COMPONENT, "transient_import_errors")); @@ -164,33 +141,6 @@ public class DistributionMetricsService { return importedPackageSize; } - /** - * Histogram of the exported content package size in Bytes. - * - * @return a Sling Metrics histogram - */ - public Histogram getExportedPackageSize() { - return exportedPackageSize; - } - - /** - * Meter of requests returning an {@code DistributionRequestState.ACCEPTED} state. - * - * @return a Sling Metrics meter - */ - public Meter getAcceptedRequests() { - return acceptedRequests; - } - - /** - * Meter of requests returning an {@code DistributionRequestState.DROPPED} state. - * - * @return a Sling Metrics meter - */ - public Meter getDroppedRequests() { - return droppedRequests; - } - /** * Counter of the package buffer size on the subscriber. * @@ -274,42 +224,6 @@ public class DistributionMetricsService { return packageJournalDistributionDuration; } - /** - * Timer capturing the duration in ms of building a content package - * - * @return a Sling Metric timer - */ - public Timer getBuildPackageDuration() { - return buildPackageDuration; - } - - /** - * Timer capturing the duration in ms of adding a package to the queue - * - * @return a Sling Metric timer - */ - public Timer getEnqueuePackageDuration() { - return enqueuePackageDuration; - } - - /** - * Counter of fetch operations to feed the queue cache. - * - * @return a Sling Metric counter - */ - public Counter getQueueCacheFetchCount() { - return queueCacheFetchCount; - } - - /** - * Counter of queue access errors. - * - * @return a Sling Metric counter - */ - public Counter getQueueAccessErrorCount() { - return queueAccessErrorCount; - } - /** * Counter of journal error codes. * @@ -331,7 +245,7 @@ public class DistributionMetricsService { ); } - public <T> Gauge<T> createGauge(String name, Supplier<T> supplier) { + private <T> Gauge<T> createGauge(String name, Supplier<T> supplier) { return metricsService.gauge(name, supplier); } @@ -391,4 +305,9 @@ public class DistributionMetricsService { return permanentImportErrors; } + public void currentRetries(String subAgentName, Supplier<Integer> retriesCallback) { + String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName; + createGauge(nameRetries, retriesCallback); + } + } diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index 818cb7c..1df8910 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -38,7 +38,7 @@ import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.SubscriberMetrics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Before; @@ -58,7 +58,7 @@ public class BookKeeperTest { private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); - private DistributionMetricsService distributionMetricsService; + private SubscriberMetrics subscriberMetrics; @Mock private EventAdmin eventAdmin; @@ -85,10 +85,10 @@ public class BookKeeperTest { @Before public void before() { - distributionMetricsService = new DistributionMetricsService(MetricsService.NOOP); + subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP); BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package", true); - bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, logSender, bkConfig, + bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender, logSender, bkConfig, importPostProcessor, invalidationProcessor); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 37028f4..1182734 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -63,7 +63,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl; import org.apache.sling.distribution.journal.queue.impl.PubQueue; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublishMetrics; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.queue.spi.DistributionQueue; @@ -109,7 +109,7 @@ public class DistributionPublisherTest { @Mock private DistributionPackageBuilder packageBuilder; - private DistributionMetricsService distributionMetricsService; + private PublishMetrics publishMetrics; private OsgiContext context = new OsgiContext(); @@ -133,7 +133,7 @@ public class DistributionPublisherTest { @Before public void before() throws Exception { MetricsService metricsService = context.registerInjectActivateService(MetricsServiceImpl.class); - distributionMetricsService = new DistributionMetricsService(metricsService); + publishMetrics = new PublishMetrics(metricsService); when(packageBuilder.getType()).thenReturn("journal"); Map<String, String> props = Map.of("name", PUB1AGENT1, "maxQueueSizeDelay", "1000"); @@ -142,7 +142,7 @@ public class DistributionPublisherTest { BundleContext bcontext = context.bundleContext(); when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender); publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory, - eventAdmin, topics, distributionMetricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext); + eventAdmin, topics, publishMetrics, pubQueueProvider, Condition.INSTANCE, config, bcontext); when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier); } @@ -251,7 +251,7 @@ public class DistributionPublisherTest { DistributionQueue queue = publisher.getQueue("i_am_not_a_queue"); assertNull(queue); - Counter counter = distributionMetricsService.getQueueAccessErrorCount(); + Counter counter = publishMetrics.getQueueAccessErrorCount(); assertEquals("Wrong queue counter expected",1, counter.getCount()); } @@ -265,7 +265,7 @@ public class DistributionPublisherTest { fail("Expected exception not thrown"); } catch (RuntimeException expectedException) { } - Counter counter = distributionMetricsService.getQueueAccessErrorCount(); + Counter counter = publishMetrics.getQueueAccessErrorCount(); assertEquals("Wrong getQueue error counter",1, counter.getCount()); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java index 7b148d5..ddeebd9 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java @@ -49,7 +49,7 @@ import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.queue.QueueState; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublishMetrics; import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.apache.sling.distribution.journal.shared.Topics; import org.junit.Before; @@ -87,7 +87,7 @@ public class MessagingCacheCallbackTest { private JournalAvailable journalAvailable; @Mock - private DistributionMetricsService distributionMetricsService; + private PublishMetrics publishMetrics; @Mock private MessageHandler<PackageMessage> handler; @@ -113,7 +113,7 @@ public class MessagingCacheCallbackTest { @Before public void before() { callback = new MessagingCacheCallback(messagingProvider, "package", - distributionMetricsService, discovery, (command) -> sender.accept(command)); + publishMetrics, discovery, (command) -> sender.accept(command)); } @Test @@ -127,7 +127,7 @@ public class MessagingCacheCallbackTest { @Test public void testFetchRange() throws Exception { - when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter); + when(publishMetrics.getQueueCacheFetchCount()).thenReturn(counter); when(messagingProvider.assignTo(10L)).thenReturn("0:10"); CompletableFuture<List<FullMessage<PackageMessage>>> result = CompletableFuture.supplyAsync(this::fetch); verify(messagingProvider, timeout(1000)).createPoller( diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 7789039..5d13f72 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -82,7 +82,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; import org.apache.sling.distribution.journal.messages.PingMessage; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.SubscriberMetrics; import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; @@ -177,7 +177,7 @@ public class SubscriberTest { private MessageSender<PackageStatusMessage> statusSender; @Spy - private DistributionMetricsService distributionMetricsService = new DistributionMetricsService(MetricsService.NOOP); + private SubscriberMetrics subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP); @Spy private ImportPostProcessor importPostProcessor = new NoOpImportPostProcessor(); diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java similarity index 83% rename from src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java rename to src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java index 4bca99e..be5e2e3 100644 --- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java @@ -34,14 +34,14 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class DistributionMetricsServiceTest { +public class SubscriberMetricsTest { - DistributionMetricsService metrics; + SubscriberMetrics metrics; @Before public void before() { MetricsService metricsService = MetricsService.NOOP; - metrics = new DistributionMetricsService(metricsService); + metrics = new SubscriberMetrics(metricsService); } public static void mockBehaviour(MetricsService metricsService) { @@ -55,11 +55,6 @@ public class DistributionMetricsServiceTest { @Test public void testGetMetrics() { - assertNotNull(metrics.getAcceptedRequests()); - assertNotNull(metrics.getBuildPackageDuration()); - assertNotNull(metrics.getDroppedRequests()); - assertNotNull(metrics.getEnqueuePackageDuration()); - assertNotNull(metrics.getExportedPackageSize()); assertNotNull(metrics.getFailedPackageImports()); assertNotNull(metrics.getImportedPackageDuration()); assertNotNull(metrics.getImportedPackageSize()); @@ -67,14 +62,11 @@ public class DistributionMetricsServiceTest { assertNotNull(metrics.getPackageDistributedDuration()); assertNotNull(metrics.getPackageJournalDistributionDuration()); assertNotNull(metrics.getProcessQueueItemDuration()); - assertNotNull(metrics.getQueueCacheFetchCount()); - assertNotNull(metrics.getQueueAccessErrorCount()); assertNotNull(metrics.getRemovedFailedPackageDuration()); assertNotNull(metrics.getRemovedPackageDuration()); assertNotNull(metrics.getSendStoredStatusDuration()); assertNotNull(metrics.getPackageStatusCounter("mockStatus")); assertNotNull(metrics.getTransientImportErrors()); assertNotNull(metrics.getPermanentImportErrors()); - metrics.createGauge("name", () -> 42); } }