This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 696ce67f8a1 IGNITE-27381 Support MessageService metrics (#7488)
696ce67f8a1 is described below
commit 696ce67f8a1a2c493248bf321f2d03ff00786e2a
Author: Aditya Mukhopadhyay <[email protected]>
AuthorDate: Tue Feb 17 12:59:00 2026 +0530
IGNITE-27381 Support MessageService metrics (#7488)
Co-authored-by: Roman Puchkovskiy <[email protected]>
---
.../ignite/internal/cli/CliIntegrationTest.java | 8 ++-
...niteDistributionZoneManagerNodeRestartTest.java | 3 +-
.../ignite/internal/metrics/MetricManagerImpl.java | 12 ++--
.../sources/StripedThreadPoolMetricSource.java | 13 ++--
modules/network/build.gradle | 3 +
.../internal/network/CriticalStripedExecutors.java | 10 ++-
.../CriticalStripedThreadPoolExecutorFactory.java | 35 ++++++++++
.../internal/network/DefaultMessagingService.java | 54 +++++++++++++--
.../network/MessagingServiceMetricSource.java | 76 ++++++++++++++++++++++
.../internal/network/MessagingServiceMetrics.java | 68 +++++++++++++++++++
.../network/scalecube/ScaleCubeClusterService.java | 5 +-
.../network/DefaultMessagingServiceTest.java | 2 +
.../scalecube/TestScaleCubeClusterService.java | 7 +-
.../network/utils/ClusterServiceTestUtils.java | 4 +-
.../raft/ItTruncateSuffixAndRestartTest.java | 4 +-
.../rest/metrics/ItMetricControllerTest.java | 8 ++-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../sql/engine/exec/QueryTaskExecutorImpl.java | 2 +-
modules/workers/build.gradle | 1 +
.../worker/CriticalSingleThreadExecutor.java | 49 ++++++++++++++
.../worker/CriticalStripedThreadPoolExecutor.java | 51 +++++++++++++++
22 files changed, 393 insertions(+), 28 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 695b9a7ebc9..00eae74a96f 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.cli;
+import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import io.micronaut.configuration.picocli.MicronautFactory;
@@ -84,7 +85,12 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("resource.vacuum").enabled(true),
new MetricSource().name("clock.service").enabled(true),
new MetricSource().name("index.builder").enabled(true),
- new MetricSource().name("raft.snapshots").enabled(true)
+ new MetricSource().name("raft.snapshots").enabled(true),
+ new MetricSource().name("messaging").enabled(true),
+ new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.default").enabled(true),
+ new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.deploymentunits").enabled(true),
+ new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.scalecube").enabled(true),
+ new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME +
".messaging.outbound").enabled(true),
};
/** Correct ignite jdbc url. */
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 98bf819ca5e..00f875ed99d 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -253,7 +253,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
new NoOpCriticalWorkerRegistry(),
failureProcessor,
defaultChannelTypeRegistry(),
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ new NoOpMetricManager()
);
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
failureProcessor);
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
index 7fb5bda92b2..ebb67a26119 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
@@ -187,16 +187,20 @@ public class MetricManagerImpl implements MetricManager {
@Override
public void unregisterSource(MetricSource src) {
inBusyLockSafe(busyLock, () -> {
- disable(src);
- registry.unregisterSource(src);
+ if (metricSources().contains(src)) {
+ disable(src);
+ registry.unregisterSource(src);
+ }
});
}
@Override
public void unregisterSource(String srcName) {
inBusyLockSafe(busyLock, () -> {
- disable(srcName);
- registry.unregisterSource(srcName);
+ if (metricSources().stream().anyMatch(metricSource ->
metricSource.name().equals(srcName))) {
+ disable(srcName);
+ registry.unregisterSource(srcName);
+ }
});
}
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
index dc6665b2ad4..1392c43fbc8 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
@@ -22,18 +22,21 @@ import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.IntGauge;
import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
/** Metric source for monitoring of {@link StripedThreadPoolExecutor}. */
-public class StripedThreadPoolMetricSource extends
AbstractMetricSource<StripedThreadPoolMetricSource.Holder> {
+public class StripedThreadPoolMetricSource<T extends
AbstractStripedThreadPoolExecutor<? extends ExecutorService>> extends
+ AbstractMetricSource<StripedThreadPoolMetricSource<T>.Holder> {
/** Striped thread pool to be monitored. */
- private final StripedThreadPoolExecutor exec;
+ private final T exec;
/**
* Creates a new thread pool metric source with the given {@code name} to
monitor the provided striped executor {@code exec},
@@ -44,8 +47,8 @@ public class StripedThreadPoolMetricSource extends
AbstractMetricSource<StripedT
* @param exec Striped thread pool executor to monitor.
* @see StripedThreadPoolExecutor
*/
- public StripedThreadPoolMetricSource(String name, @Nullable String
description, StripedThreadPoolExecutor exec) {
- this(name, null, THREAD_POOLS_GROUP_NAME, exec);
+ public StripedThreadPoolMetricSource(String name, @Nullable String
description, T exec) {
+ this(name, description, THREAD_POOLS_GROUP_NAME, exec);
}
/**
@@ -60,7 +63,7 @@ public class StripedThreadPoolMetricSource extends
AbstractMetricSource<StripedT
String name,
@Nullable String description,
@Nullable String group,
- StripedThreadPoolExecutor exec
+ T exec
) {
super(name, description, group);
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index fbfe99977b9..bf931fa03cc 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -33,6 +33,7 @@ dependencies {
implementation project(':ignite-vault')
implementation project(':ignite-workers')
implementation project(':ignite-failure-handler')
+ implementation project(':ignite-metrics')
implementation libs.jetbrains.annotations
implementation libs.scalecube.cluster
implementation libs.fastutil.core
@@ -57,6 +58,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-failure-handler'))
+ testImplementation testFixtures(project(':ignite-metrics'))
testImplementation libs.jmh.core
testImplementation(libs.kryo) {
//IDEA test runner don't apply Gradle dependency resolve strategy,
this is just not implemented
@@ -73,6 +75,7 @@ dependencies {
testFixturesImplementation project(':ignite-configuration')
testFixturesImplementation project(':ignite-configuration-root')
testFixturesImplementation project(':ignite-failure-handler')
+ testFixturesImplementation testFixtures(project(':ignite-metrics'))
testFixturesImplementation testFixtures(project(':ignite-core'))
testFixturesImplementation testFixtures(project(':ignite-configuration'))
testFixturesImplementation testFixtures(project(':ignite-workers'))
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
index 4b39cd54dbd..49024eae8e6 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
@@ -27,9 +27,11 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.thread.StripedExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.jetbrains.annotations.Nullable;
/**
* Collection of {@link StripedExecutor executors} for the network based on
{@link ChannelType#id()}.
@@ -51,11 +53,15 @@ class CriticalStripedExecutors implements ManuallyCloseable
{
String poolNamePrefix,
CriticalWorkerRegistry workerRegistry,
ChannelTypeRegistry channelTypeRegistry,
- IgniteLogger log
+ IgniteLogger log,
+ @Nullable MetricManager metricManager,
+ @Nullable String metricNamePrefix,
+ @Nullable String metricDescription
) {
this.workerRegistry = workerRegistry;
- var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName,
poolNamePrefix, log, workerRegistry, registeredWorkers);
+ var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName,
poolNamePrefix, log, workerRegistry, registeredWorkers,
+ metricManager, metricNamePrefix, metricDescription);
executorByChannelTypeId =
StripedExecutorByChannelTypeId.of(channelTypeRegistry, factory);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
index d2ee399c759..4ac37826555 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
@@ -21,9 +21,11 @@ import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
import java.util.List;
import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
import org.apache.ignite.internal.worker.CriticalWorker;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.jetbrains.annotations.Nullable;
/** Factory for creating {@link CriticalStripedThreadPoolExecutor}. */
class CriticalStripedThreadPoolExecutorFactory {
@@ -43,18 +45,45 @@ class CriticalStripedThreadPoolExecutorFactory {
private final List<CriticalWorker> registeredWorkers;
+ @Nullable
+ private final MetricManager metricManager;
+
+ @Nullable
+ private final String metricNamePrefix;
+
+ @Nullable
+ private final String metricDescription;
+
+ @SuppressWarnings("unused")
CriticalStripedThreadPoolExecutorFactory(
String nodeName,
String poolNamePrefix,
IgniteLogger log,
CriticalWorkerRegistry workerRegistry,
List<CriticalWorker> registeredWorkers
+ ) {
+ this(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers,
null, null, null);
+ }
+
+ CriticalStripedThreadPoolExecutorFactory(
+ String nodeName,
+ String poolNamePrefix,
+ IgniteLogger log,
+ CriticalWorkerRegistry workerRegistry,
+ List<CriticalWorker> registeredWorkers,
+ @Nullable MetricManager metricManager,
+ @Nullable String metricNamePrefix,
+ @Nullable String metricDescription
) {
this.nodeName = nodeName;
this.poolNamePrefix = poolNamePrefix;
this.log = log;
this.workerRegistry = workerRegistry;
this.registeredWorkers = registeredWorkers;
+
+ this.metricManager = metricManager;
+ this.metricNamePrefix = metricNamePrefix;
+ this.metricDescription = metricDescription;
}
CriticalStripedThreadPoolExecutor create(ChannelType channelType) {
@@ -64,6 +93,12 @@ class CriticalStripedThreadPoolExecutorFactory {
var threadFactory = IgniteMessageServiceThreadFactory.create(nodeName,
poolName, log, NOTHING_ALLOWED);
var executor = new
CriticalStripedThreadPoolExecutor(stripeCountForIndex(channelTypeId),
threadFactory, false, 0);
+ if (metricManager != null && metricNamePrefix != null) {
+ String metricName = String.format("%s.%s", metricNamePrefix,
channelType.name().toLowerCase());
+
+ executor.initMetricSource(metricManager, metricName,
metricDescription);
+ }
+
for (CriticalWorker worker : executor.workers()) {
workerRegistry.register(worker);
registeredWorkers.add(worker);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 08f8960a82f..54cf63c323a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
+import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
import static
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetworkThread;
import static
org.apache.ignite.internal.network.serialization.PerSessionSerializationService.createClassDescriptorsMessages;
import static
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
@@ -103,6 +105,12 @@ public class DefaultMessagingService extends
AbstractMessagingService {
private final FailureProcessor failureProcessor;
+ private final MetricManager metricManager;
+
+ private final MessagingServiceMetricSource metricSource = new
MessagingServiceMetricSource();
+
+ private final MessagingServiceMetrics metrics = new
MessagingServiceMetrics(metricSource);
+
/** Connection manager that provides access to {@link NettySender}. */
private final ConnectionManager connectionManager;
@@ -146,6 +154,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
* @param marshaller Marshaller.
* @param criticalWorkerRegistry Used to register critical threads managed
by the new service and its components.
* @param failureProcessor Failure processor.
+ * @param metricManager The metrics manager.
* @param channelTypeRegistry {@link ChannelType} registry.
*/
public DefaultMessagingService(
@@ -158,6 +167,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor,
ConnectionManager connectionManager,
+ MetricManager metricManager,
ChannelTypeRegistry channelTypeRegistry
) {
this.factory = factory;
@@ -167,6 +177,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
this.marshaller = marshaller;
this.criticalWorkerRegistry = criticalWorkerRegistry;
this.failureProcessor = failureProcessor;
+ this.metricManager = metricManager;
this.connectionManager = connectionManager;
connectionManager.addListener(this::handleMessageFromNetwork);
@@ -174,13 +185,18 @@ public class DefaultMessagingService extends
AbstractMessagingService {
outboundExecutor = new CriticalSingleThreadExecutor(
IgniteMessageServiceThreadFactory.create(nodeName,
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
);
+ outboundExecutor.initMetricSource(metricManager,
THREAD_POOLS_METRICS_SOURCE_NAME + ".messaging.outbound",
+ "Outbound message executor metrics");
inboundExecutors = new CriticalStripedExecutors(
nodeName,
"MessagingService-inbound",
criticalWorkerRegistry,
channelTypeRegistry,
- LOG
+ LOG,
+ metricManager,
+ THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound",
+ "Inbound message executor metrics"
);
timeoutWorker = new TimeoutWorker(
@@ -207,6 +223,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
InternalClusterNode recipient =
topologyService.getByConsistentId(recipientConsistentId);
if (recipient == null) {
+ metrics.incrementMessageRecipientNotFound();
+
return failedFuture(
new UnresolvableConsistentIdException("Recipient
consistent ID cannot be resolved: " + recipientConsistentId)
);
@@ -237,6 +255,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
InternalClusterNode recipient =
topologyService.getByConsistentId(recipientConsistentId);
if (recipient == null) {
+ metrics.incrementMessageRecipientNotFound();
+
return failedFuture(
new UnresolvableConsistentIdException("Recipient
consistent ID cannot be resolved: " + recipientConsistentId)
);
@@ -256,6 +276,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
InternalClusterNode recipient =
topologyService.getByConsistentId(recipientConsistentId);
if (recipient == null) {
+ metrics.incrementMessageRecipientNotFound();
+
return failedFuture(
new UnresolvableConsistentIdException("Recipient
consistent ID cannot be resolved: " + recipientConsistentId)
);
@@ -398,6 +420,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
.thenComposeToCompletable(sender -> {
if (strictIdCheck && nodeId != null &&
!sender.launchId().equals(nodeId)) {
// The destination node has been rebooted, so it's a
different node instance.
+ metrics.incrementMessageRecipientNotFound();
+
throw new RecipientLeftException("Target node ID is "
+ nodeId + ", but " + sender.launchId() + " responded");
}
@@ -447,6 +471,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
List<HandlerContext> handlerContexts =
getHandlerContexts(message.groupType());
// Specially made by a classic loop for optimization.
+ //noinspection ForLoopReplaceableByForEach
for (int i = 0; i < handlerContexts.size(); i++) {
HandlerContext handlerContext = handlerContexts.get(i);
@@ -497,17 +522,21 @@ public class DefaultMessagingService extends
AbstractMessagingService {
Long finalCorrelationId = correlationId;
firstHandlerExecutor.execute(() -> {
- long startedNanos = longHandlingLoggingEnabled ? System.nanoTime()
: 0;
+ long startedNanos = System.nanoTime();
try {
handleStartingWithFirstHandler(payload, finalCorrelationId,
inNetworkObject, firstHandlerContext, handlerContexts);
} catch (Throwable e) {
+ metrics.incrementMessageHandlingFailures();
+
handleAndRethrowIfError(inNetworkObject, e);
} finally {
- if (longHandlingLoggingEnabled && LOG.isWarnEnabled()) {
- long tookMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
+ long tookMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
+
+ if (tookMillis > 100) {
+ metrics.incrementSlowResponses();
- if (tookMillis > 100) {
+ if (longHandlingLoggingEnabled && LOG.isWarnEnabled()) {
LOG.warn(
"Processing of {} from {} took {} ms",
LOG.isDebugEnabled() && includeSensitive() ?
message : message.toStringForLightLogging(),
@@ -647,7 +676,14 @@ public class DefaultMessagingService extends
AbstractMessagingService {
TimeoutObjectImpl responseFuture = requestsMap.remove(correlationId);
if (responseFuture != null) {
- responseFuture.future().complete(response);
+ var fut = responseFuture.future();
+
+ fut.complete(response);
+
+ // Check if it was already completed exceptionally by the timeout
worker.
+ if (fut.isCompletedExceptionally()) {
+ metrics.incrementInvokeTimeouts();
+ }
}
}
@@ -698,12 +734,17 @@ public class DefaultMessagingService extends
AbstractMessagingService {
recipientInetAddrByNodeId.remove(member.id());
}
});
+
+ metricManager.registerSource(metricSource);
+ metricManager.enable(metricSource);
}
/**
* Stops the messaging service.
*/
public void stop() throws Exception {
+ metricManager.unregisterSource(metricSource);
+
var exception = new NodeStoppingException();
requestsMap.values().forEach(fut ->
fut.future().completeExceptionally(exception));
@@ -722,6 +763,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
// TODO: IGNITE-18493 - remove/move this
+
/**
* Installs a predicate, it will be consulted with for each message being
sent; when it returns {@code true}, the
* message will be dropped (it will not be sent; the corresponding future
will time out soon for {@code invoke()} methods
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
new file mode 100644
index 00000000000..2c26389c226
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metric source for the {@link MessagingService}.
+ */
+class MessagingServiceMetricSource implements MetricSource {
+ /** Metrics map. Only modified in {@code synchronized} context. */
+ private final Map<String, Metric> metrics = new HashMap<>();
+
+ /** Enabled flag. Only modified in {@code synchronized} context. */
+ private boolean enabled;
+
+ @Override
+ public String name() {
+ return "messaging";
+ }
+
+ @Override
+ public @Nullable String description() {
+ return "Metrics for the messaging service.";
+ }
+
+ /** Adds metric to the source. */
+ synchronized <T extends Metric> T addMetric(T metric) {
+ assert !enabled : "Cannot add metrics when source is enabled";
+
+ metrics.put(metric.name(), metric);
+
+ return metric;
+ }
+
+ @Override
+ public synchronized @Nullable MetricSet enable() {
+ if (enabled) {
+ return null;
+ }
+
+ enabled = true;
+
+ return new MetricSet(name(), description(), group(),
Map.copyOf(metrics));
+ }
+
+ @Override
+ public synchronized void disable() {
+ enabled = false;
+ }
+
+ @Override
+ public synchronized boolean enabled() {
+ return enabled;
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
new file mode 100644
index 00000000000..3390e7f09b7
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class MessagingServiceMetrics {
+ private final AtomicLongMetric messageHandlingFailures;
+
+ private final AtomicLongMetric messageRecipientNotFound;
+
+ private final AtomicLongMetric invokeTimeouts;
+
+ private final AtomicLongMetric slowResponses;
+
+ MessagingServiceMetrics(MessagingServiceMetricSource source) {
+ messageHandlingFailures = source.addMetric(new AtomicLongMetric(
+ "messageHandlingFailures",
+ "Total number of message handling failures."
+ ));
+
+ messageRecipientNotFound = source.addMetric(new AtomicLongMetric(
+ "messageRecipientNotFound",
+ "Total number of message recipient resolution failures."
+ ));
+
+ invokeTimeouts = source.addMetric(new AtomicLongMetric(
+ "invokeTimeouts",
+ "Total number of invocation timeouts."
+ ));
+
+ slowResponses = source.addMetric(new AtomicLongMetric(
+ "slowResponses",
+ "Total number of responses that took long to generate (>
100ms)."
+ ));
+ }
+
+ void incrementMessageHandlingFailures() {
+ messageHandlingFailures.increment();
+ }
+
+ void incrementMessageRecipientNotFound() {
+ messageRecipientNotFound.increment();
+ }
+
+ void incrementInvokeTimeouts() {
+ invokeTimeouts.increment();
+ }
+
+ void incrementSlowResponses() {
+ slowResponses.increment();
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
index 777a2e9a330..62a4fb881f7 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
@@ -46,6 +46,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ChannelTypeRegistry;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -119,7 +120,8 @@ public class ScaleCubeClusterService implements
ClusterService {
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor,
ChannelTypeRegistry channelTypeRegistry,
- IgniteProductVersionSource productVersionSource
+ IgniteProductVersionSource productVersionSource,
+ MetricManager metricManager
) {
this.config = networkConfiguration;
this.serializationRegistry = serializationRegistry;
@@ -167,6 +169,7 @@ public class ScaleCubeClusterService implements
ClusterService {
criticalWorkerRegistry,
failureProcessor,
connectionMgr,
+ metricManager,
channelTypeRegistry
);
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index fdc8d8550cb..66b37398422 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -64,6 +64,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
import org.apache.ignite.internal.network.messages.InstantContainer;
@@ -633,6 +634,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
criticalWorkerRegistry,
failureProcessor,
connectionManager,
+ new NoOpMetricManager(),
channelTypeRegistry
);
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
index 53f15d2bc73..c6aa20df65e 100644
---
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.scalecube;
import io.scalecube.cluster.ClusterConfig;
import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.ChannelTypeRegistry;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
@@ -44,7 +45,8 @@ public class TestScaleCubeClusterService extends
ScaleCubeClusterService {
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor,
ChannelTypeRegistry channelTypeRegistry,
- IgniteProductVersionSource productVersionSource
+ IgniteProductVersionSource productVersionSource,
+ MetricManager metricManager
) {
super(
consistentId,
@@ -56,7 +58,8 @@ public class TestScaleCubeClusterService extends
ScaleCubeClusterService {
criticalWorkerRegistry,
failureProcessor,
channelTypeRegistry,
- productVersionSource
+ productVersionSource,
+ metricManager
);
}
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
index d8e9625eae9..c68b67a6cca 100644
---
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.configuration.validation.TestConfigurationVali
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ChannelTypeRegistry;
import org.apache.ignite.internal.network.ChannelTypeRegistryProvider;
import org.apache.ignite.internal.network.ClusterIdSupplier;
@@ -249,7 +250,8 @@ public class ClusterServiceTestUtils {
new NoOpCriticalWorkerRegistry(),
new FailureManager(new NoOpFailureHandler()),
defaultChannelTypeRegistry(),
- productVersionSource
+ productVersionSource,
+ new NoOpMetricManager()
) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index c2fbbe328b4..e52c334c783 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -198,7 +199,8 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
new NoOpCriticalWorkerRegistry(),
mock(FailureManager.class),
defaultChannelTypeRegistry(),
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ new NoOpMetricManager()
);
assertThat(clusterSvc.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index c73981c88f0..6f2ee5385ac 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -23,6 +23,7 @@ import static io.micronaut.http.HttpRequest.POST;
import static io.micronaut.http.HttpStatus.NOT_FOUND;
import static io.micronaut.http.MediaType.TEXT_PLAIN_TYPE;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
import static
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
import static
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
@@ -72,7 +73,12 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("placement-driver", true),
new MetricSource("clock.service", true),
new MetricSource("index.builder", true),
- new MetricSource("raft.snapshots", true)
+ new MetricSource("raft.snapshots", true),
+ new MetricSource("messaging", true),
+ new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.default", true),
+ new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.deploymentunits", true),
+ new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".striped.messaging.inbound.scalecube", true),
+ new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME +
".messaging.outbound", true),
};
@Inject
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 739b7f2b0df..35fae54f048 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -407,7 +407,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
workerRegistry,
failureProcessor,
defaultChannelTypeRegistry(),
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ new NoOpMetricManager()
);
var hybridClock = new HybridClockImpl();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 24b761491b2..fdfbc0440f7 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -640,7 +640,8 @@ public class IgniteImpl implements Ignite {
criticalWorkerRegistry,
failureManager,
ChannelTypeRegistryProvider.loadByServiceLoader(serviceProviderClassLoader),
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ metricManager
);
clock = new HybridClockImpl(failureManager);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
index 17fa2df3744..dad359ece50 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
@@ -88,7 +88,7 @@ public class QueryTaskExecutorImpl implements
QueryTaskExecutor {
0
);
- metricManager.registerSource(new
StripedThreadPoolMetricSource(QUERY_EXECUTOR_SOURCE_NAME, null,
stripedThreadPoolExecutor));
+ metricManager.registerSource(new
StripedThreadPoolMetricSource<>(QUERY_EXECUTOR_SOURCE_NAME, null,
stripedThreadPoolExecutor));
metricManager.enable(QUERY_EXECUTOR_SOURCE_NAME);
}
diff --git a/modules/workers/build.gradle b/modules/workers/build.gradle
index f16c09e0def..2bbd7c585cc 100644
--- a/modules/workers/build.gradle
+++ b/modules/workers/build.gradle
@@ -30,6 +30,7 @@ dependencies {
implementation project(':ignite-configuration-api')
implementation project(':ignite-configuration-root')
implementation project(':ignite-failure-handler')
+ implementation project(':ignite-metrics')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
index 531c61642c1..de62961f5cb 100644
---
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
@@ -19,11 +19,17 @@ package org.apache.ignite.internal.worker;
import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Single thread executor instrumented to be used as a {@link CriticalWorker}
and being monitored by the {@link CriticalWorkerWatchdog}.
@@ -34,6 +40,9 @@ public class CriticalSingleThreadExecutor extends
ThreadPoolExecutor implements
private volatile Thread lastSeenThread;
private volatile long heartbeatNanos = NOT_MONITORED;
+ private @Nullable MetricSource metricSource;
+ private @Nullable MetricManager metricManager;
+
/** Constructor. */
public CriticalSingleThreadExecutor(ThreadFactory threadFactory) {
this(0, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
@@ -44,6 +53,24 @@ public class CriticalSingleThreadExecutor extends
ThreadPoolExecutor implements
super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
}
+ /**
+ * Initialize the metric source to track this thread pool's metrics.
+ *
+ * @param metricManager The metric manager used to register the source.
+ * @param name The name of the metric.
+ * @param description The metric description.
+ */
+ public void initMetricSource(MetricManager metricManager, String name,
String description) {
+ if (this.metricManager == null) {
+ this.metricManager = metricManager;
+
+ metricSource = new ThreadPoolMetricSource(name, description, null,
this);
+
+ metricManager.registerSource(metricSource);
+ metricManager.enable(metricSource);
+ }
+ }
+
@Override
protected void beforeExecute(Thread t, Runnable r) {
lastSeenThread = t;
@@ -74,4 +101,26 @@ public class CriticalSingleThreadExecutor extends
ThreadPoolExecutor implements
public long heartbeatNanos() {
return heartbeatNanos;
}
+
+ @Override
+ public void shutdown() {
+ if (metricManager != null) {
+ assert metricSource != null;
+
+ metricManager.unregisterSource(metricSource);
+ }
+
+ super.shutdown();
+ }
+
+ @Override
+ public @NotNull List<Runnable> shutdownNow() {
+ if (metricManager != null) {
+ assert metricSource != null;
+
+ metricManager.unregisterSource(metricSource);
+ }
+
+ return super.shutdownNow();
+ }
}
diff --git
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
index 107595b5e27..5ed192d27b6 100644
---
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
+++
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
@@ -26,9 +26,14 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSource;
+import
org.apache.ignite.internal.metrics.sources.StripedThreadPoolMetricSource;
import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
import org.apache.ignite.internal.thread.StripedExecutor;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Same as {@link StripedThreadPoolExecutor}, but each stripe is a critical
worker monitored for being blocked.
@@ -36,6 +41,12 @@ import
org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
public class CriticalStripedThreadPoolExecutor extends
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
private final List<CriticalWorker> workers;
+ @Nullable
+ private MetricManager metricManager;
+
+ @Nullable
+ private MetricSource metricSource;
+
/**
* Create a blockage-monitored striped thread pool.
*
@@ -93,4 +104,44 @@ public class CriticalStripedThreadPoolExecutor extends
AbstractStripedThreadPool
public Collection<CriticalWorker> workers() {
return workers;
}
+
+ /**
+ * Initialize the metric source to track this thread pool's metrics.
+ *
+ * @param metricManager The metric manager used to register the source.
+ * @param name The name of the metric.
+ * @param description The metric description.
+ */
+ public void initMetricSource(MetricManager metricManager, String name,
String description) {
+ if (this.metricManager == null) {
+ this.metricManager = metricManager;
+
+ metricSource = new StripedThreadPoolMetricSource<>(name,
description, null, this);
+
+ metricManager.registerSource(metricSource);
+ metricManager.enable(metricSource);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (metricManager != null) {
+ assert metricSource != null;
+
+ metricManager.unregisterSource(metricSource);
+ }
+
+ super.shutdown();
+ }
+
+ @Override
+ public @NotNull List<Runnable> shutdownNow() {
+ if (metricManager != null) {
+ assert metricSource != null;
+
+ metricManager.unregisterSource(metricSource);
+ }
+
+ return super.shutdownNow();
+ }
}