This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 696ce67f8a1 IGNITE-27381 Support MessageService metrics (#7488)
696ce67f8a1 is described below

commit 696ce67f8a1a2c493248bf321f2d03ff00786e2a
Author: Aditya Mukhopadhyay <[email protected]>
AuthorDate: Tue Feb 17 12:59:00 2026 +0530

    IGNITE-27381 Support MessageService metrics (#7488)
    
    Co-authored-by: Roman Puchkovskiy <[email protected]>
---
 .../ignite/internal/cli/CliIntegrationTest.java    |  8 ++-
 ...niteDistributionZoneManagerNodeRestartTest.java |  3 +-
 .../ignite/internal/metrics/MetricManagerImpl.java | 12 ++--
 .../sources/StripedThreadPoolMetricSource.java     | 13 ++--
 modules/network/build.gradle                       |  3 +
 .../internal/network/CriticalStripedExecutors.java | 10 ++-
 .../CriticalStripedThreadPoolExecutorFactory.java  | 35 ++++++++++
 .../internal/network/DefaultMessagingService.java  | 54 +++++++++++++--
 .../network/MessagingServiceMetricSource.java      | 76 ++++++++++++++++++++++
 .../internal/network/MessagingServiceMetrics.java  | 68 +++++++++++++++++++
 .../network/scalecube/ScaleCubeClusterService.java |  5 +-
 .../network/DefaultMessagingServiceTest.java       |  2 +
 .../scalecube/TestScaleCubeClusterService.java     |  7 +-
 .../network/utils/ClusterServiceTestUtils.java     |  4 +-
 .../raft/ItTruncateSuffixAndRestartTest.java       |  4 +-
 .../rest/metrics/ItMetricControllerTest.java       |  8 ++-
 .../runner/app/ItIgniteNodeRestartTest.java        |  3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  3 +-
 .../sql/engine/exec/QueryTaskExecutorImpl.java     |  2 +-
 modules/workers/build.gradle                       |  1 +
 .../worker/CriticalSingleThreadExecutor.java       | 49 ++++++++++++++
 .../worker/CriticalStripedThreadPoolExecutor.java  | 51 +++++++++++++++
 22 files changed, 393 insertions(+), 28 deletions(-)

diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 695b9a7ebc9..00eae74a96f 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.cli;
 
+import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import io.micronaut.configuration.picocli.MicronautFactory;
@@ -84,7 +85,12 @@ public abstract class CliIntegrationTest extends 
ClusterPerClassIntegrationTest
             new MetricSource().name("resource.vacuum").enabled(true),
             new MetricSource().name("clock.service").enabled(true),
             new MetricSource().name("index.builder").enabled(true),
-            new MetricSource().name("raft.snapshots").enabled(true)
+            new MetricSource().name("raft.snapshots").enabled(true),
+            new MetricSource().name("messaging").enabled(true),
+            new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.default").enabled(true),
+            new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.deploymentunits").enabled(true),
+            new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.scalecube").enabled(true),
+            new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".messaging.outbound").enabled(true),
     };
 
     /** Correct ignite jdbc url. */
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 98bf819ca5e..00f875ed99d 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -253,7 +253,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
                 new NoOpCriticalWorkerRegistry(),
                 failureProcessor,
                 defaultChannelTypeRegistry(),
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                new NoOpMetricManager()
         );
 
         var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, 
failureProcessor);
diff --git 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
index 7fb5bda92b2..ebb67a26119 100644
--- 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
+++ 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManagerImpl.java
@@ -187,16 +187,20 @@ public class MetricManagerImpl implements MetricManager {
     @Override
     public void unregisterSource(MetricSource src) {
         inBusyLockSafe(busyLock, () -> {
-            disable(src);
-            registry.unregisterSource(src);
+            if (metricSources().contains(src)) {
+                disable(src);
+                registry.unregisterSource(src);
+            }
         });
     }
 
     @Override
     public void unregisterSource(String srcName) {
         inBusyLockSafe(busyLock, () -> {
-            disable(srcName);
-            registry.unregisterSource(srcName);
+            if (metricSources().stream().anyMatch(metricSource -> 
metricSource.name().equals(srcName))) {
+                disable(srcName);
+                registry.unregisterSource(srcName);
+            }
         });
     }
 
diff --git 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
index dc6665b2ad4..1392c43fbc8 100644
--- 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
+++ 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/StripedThreadPoolMetricSource.java
@@ -22,18 +22,21 @@ import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.internal.metrics.AbstractMetricSource;
 import org.apache.ignite.internal.metrics.IntGauge;
 import org.apache.ignite.internal.metrics.LongGauge;
 import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
 /** Metric source for monitoring of {@link StripedThreadPoolExecutor}. */
-public class StripedThreadPoolMetricSource extends 
AbstractMetricSource<StripedThreadPoolMetricSource.Holder> {
+public class StripedThreadPoolMetricSource<T extends 
AbstractStripedThreadPoolExecutor<? extends ExecutorService>> extends
+        AbstractMetricSource<StripedThreadPoolMetricSource<T>.Holder> {
     /** Striped thread pool to be monitored. */
-    private final StripedThreadPoolExecutor exec;
+    private final T exec;
 
     /**
      * Creates a new thread pool metric source with the given {@code name} to 
monitor the provided striped executor {@code exec},
@@ -44,8 +47,8 @@ public class StripedThreadPoolMetricSource extends 
AbstractMetricSource<StripedT
      * @param exec Striped thread pool executor to monitor.
      * @see StripedThreadPoolExecutor
      */
-    public StripedThreadPoolMetricSource(String name, @Nullable String 
description, StripedThreadPoolExecutor exec) {
-        this(name, null, THREAD_POOLS_GROUP_NAME, exec);
+    public StripedThreadPoolMetricSource(String name, @Nullable String 
description, T exec) {
+        this(name, description, THREAD_POOLS_GROUP_NAME, exec);
     }
 
     /**
@@ -60,7 +63,7 @@ public class StripedThreadPoolMetricSource extends 
AbstractMetricSource<StripedT
             String name,
             @Nullable String description,
             @Nullable String group,
-            StripedThreadPoolExecutor exec
+            T exec
     ) {
         super(name, description, group);
 
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index fbfe99977b9..bf931fa03cc 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -33,6 +33,7 @@ dependencies {
     implementation project(':ignite-vault')
     implementation project(':ignite-workers')
     implementation project(':ignite-failure-handler')
+    implementation project(':ignite-metrics')
     implementation libs.jetbrains.annotations
     implementation libs.scalecube.cluster
     implementation libs.fastutil.core
@@ -57,6 +58,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-configuration'))
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-failure-handler'))
+    testImplementation testFixtures(project(':ignite-metrics'))
     testImplementation libs.jmh.core
     testImplementation(libs.kryo) {
         //IDEA test runner don't apply Gradle dependency resolve strategy, 
this is just not implemented
@@ -73,6 +75,7 @@ dependencies {
     testFixturesImplementation project(':ignite-configuration')
     testFixturesImplementation project(':ignite-configuration-root')
     testFixturesImplementation project(':ignite-failure-handler')
+    testFixturesImplementation testFixtures(project(':ignite-metrics'))
     testFixturesImplementation testFixtures(project(':ignite-core'))
     testFixturesImplementation testFixtures(project(':ignite-configuration'))
     testFixturesImplementation testFixtures(project(':ignite-workers'))
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
index 4b39cd54dbd..49024eae8e6 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedExecutors.java
@@ -27,9 +27,11 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.thread.StripedExecutor;
 import org.apache.ignite.internal.worker.CriticalWorker;
 import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Collection of {@link StripedExecutor executors} for the network based on 
{@link ChannelType#id()}.
@@ -51,11 +53,15 @@ class CriticalStripedExecutors implements ManuallyCloseable 
{
             String poolNamePrefix,
             CriticalWorkerRegistry workerRegistry,
             ChannelTypeRegistry channelTypeRegistry,
-            IgniteLogger log
+            IgniteLogger log,
+            @Nullable MetricManager metricManager,
+            @Nullable String metricNamePrefix,
+            @Nullable String metricDescription
     ) {
         this.workerRegistry = workerRegistry;
 
-        var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, 
poolNamePrefix, log, workerRegistry, registeredWorkers);
+        var factory = new CriticalStripedThreadPoolExecutorFactory(nodeName, 
poolNamePrefix, log, workerRegistry, registeredWorkers,
+                metricManager, metricNamePrefix, metricDescription);
 
         executorByChannelTypeId = 
StripedExecutorByChannelTypeId.of(channelTypeRegistry, factory);
     }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
index d2ee399c759..4ac37826555 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/CriticalStripedThreadPoolExecutorFactory.java
@@ -21,9 +21,11 @@ import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
 
 import java.util.List;
 import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.worker.CriticalStripedThreadPoolExecutor;
 import org.apache.ignite.internal.worker.CriticalWorker;
 import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.jetbrains.annotations.Nullable;
 
 /** Factory for creating {@link CriticalStripedThreadPoolExecutor}. */
 class CriticalStripedThreadPoolExecutorFactory {
@@ -43,18 +45,45 @@ class CriticalStripedThreadPoolExecutorFactory {
 
     private final List<CriticalWorker> registeredWorkers;
 
+    @Nullable
+    private final MetricManager metricManager;
+
+    @Nullable
+    private final String metricNamePrefix;
+
+    @Nullable
+    private final String metricDescription;
+
+    @SuppressWarnings("unused")
     CriticalStripedThreadPoolExecutorFactory(
             String nodeName,
             String poolNamePrefix,
             IgniteLogger log,
             CriticalWorkerRegistry workerRegistry,
             List<CriticalWorker> registeredWorkers
+    ) {
+        this(nodeName, poolNamePrefix, log, workerRegistry, registeredWorkers, 
null, null, null);
+    }
+
+    CriticalStripedThreadPoolExecutorFactory(
+            String nodeName,
+            String poolNamePrefix,
+            IgniteLogger log,
+            CriticalWorkerRegistry workerRegistry,
+            List<CriticalWorker> registeredWorkers,
+            @Nullable MetricManager metricManager,
+            @Nullable String metricNamePrefix,
+            @Nullable String metricDescription
     ) {
         this.nodeName = nodeName;
         this.poolNamePrefix = poolNamePrefix;
         this.log = log;
         this.workerRegistry = workerRegistry;
         this.registeredWorkers = registeredWorkers;
+
+        this.metricManager = metricManager;
+        this.metricNamePrefix = metricNamePrefix;
+        this.metricDescription = metricDescription;
     }
 
     CriticalStripedThreadPoolExecutor create(ChannelType channelType) {
@@ -64,6 +93,12 @@ class CriticalStripedThreadPoolExecutorFactory {
         var threadFactory = IgniteMessageServiceThreadFactory.create(nodeName, 
poolName, log, NOTHING_ALLOWED);
         var executor = new 
CriticalStripedThreadPoolExecutor(stripeCountForIndex(channelTypeId), 
threadFactory, false, 0);
 
+        if (metricManager != null && metricNamePrefix != null) {
+            String metricName = String.format("%s.%s", metricNamePrefix, 
channelType.name().toLowerCase());
+
+            executor.initMetricSource(metricManager, metricName, 
metricDescription);
+        }
+
         for (CriticalWorker worker : executor.workers()) {
             workerRegistry.register(worker);
             registeredWorkers.add(worker);
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 08f8960a82f..54cf63c323a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
+import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
 import static 
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetworkThread;
 import static 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService.createClassDescriptorsMessages;
 import static 
org.apache.ignite.internal.thread.ThreadOperation.NOTHING_ALLOWED;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
@@ -103,6 +105,12 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
     private final FailureProcessor failureProcessor;
 
+    private final MetricManager metricManager;
+
+    private final MessagingServiceMetricSource metricSource = new 
MessagingServiceMetricSource();
+
+    private final MessagingServiceMetrics metrics = new 
MessagingServiceMetrics(metricSource);
+
     /** Connection manager that provides access to {@link NettySender}. */
     private final ConnectionManager connectionManager;
 
@@ -146,6 +154,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
      * @param marshaller Marshaller.
      * @param criticalWorkerRegistry Used to register critical threads managed 
by the new service and its components.
      * @param failureProcessor Failure processor.
+     * @param metricManager The metrics manager.
      * @param channelTypeRegistry {@link ChannelType} registry.
      */
     public DefaultMessagingService(
@@ -158,6 +167,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             CriticalWorkerRegistry criticalWorkerRegistry,
             FailureProcessor failureProcessor,
             ConnectionManager connectionManager,
+            MetricManager metricManager,
             ChannelTypeRegistry channelTypeRegistry
     ) {
         this.factory = factory;
@@ -167,6 +177,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         this.marshaller = marshaller;
         this.criticalWorkerRegistry = criticalWorkerRegistry;
         this.failureProcessor = failureProcessor;
+        this.metricManager = metricManager;
 
         this.connectionManager = connectionManager;
         connectionManager.addListener(this::handleMessageFromNetwork);
@@ -174,13 +185,18 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         outboundExecutor = new CriticalSingleThreadExecutor(
                 IgniteMessageServiceThreadFactory.create(nodeName, 
"MessagingService-outbound", LOG, NOTHING_ALLOWED)
         );
+        outboundExecutor.initMetricSource(metricManager, 
THREAD_POOLS_METRICS_SOURCE_NAME + ".messaging.outbound",
+                "Outbound message executor metrics");
 
         inboundExecutors = new CriticalStripedExecutors(
                 nodeName,
                 "MessagingService-inbound",
                 criticalWorkerRegistry,
                 channelTypeRegistry,
-                LOG
+                LOG,
+                metricManager,
+                THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound",
+                "Inbound message executor metrics"
         );
 
         timeoutWorker = new TimeoutWorker(
@@ -207,6 +223,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         InternalClusterNode recipient = 
topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
+            metrics.incrementMessageRecipientNotFound();
+
             return failedFuture(
                     new UnresolvableConsistentIdException("Recipient 
consistent ID cannot be resolved: " + recipientConsistentId)
             );
@@ -237,6 +255,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         InternalClusterNode recipient = 
topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
+            metrics.incrementMessageRecipientNotFound();
+
             return failedFuture(
                     new UnresolvableConsistentIdException("Recipient 
consistent ID cannot be resolved: " + recipientConsistentId)
             );
@@ -256,6 +276,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         InternalClusterNode recipient = 
topologyService.getByConsistentId(recipientConsistentId);
 
         if (recipient == null) {
+            metrics.incrementMessageRecipientNotFound();
+
             return failedFuture(
                     new UnresolvableConsistentIdException("Recipient 
consistent ID cannot be resolved: " + recipientConsistentId)
             );
@@ -398,6 +420,8 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                 .thenComposeToCompletable(sender -> {
                     if (strictIdCheck && nodeId != null && 
!sender.launchId().equals(nodeId)) {
                         // The destination node has been rebooted, so it's a 
different node instance.
+                        metrics.incrementMessageRecipientNotFound();
+
                         throw new RecipientLeftException("Target node ID is " 
+ nodeId + ", but " + sender.launchId() + " responded");
                     }
 
@@ -447,6 +471,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         List<HandlerContext> handlerContexts = 
getHandlerContexts(message.groupType());
 
         // Specially made by a classic loop for optimization.
+        //noinspection ForLoopReplaceableByForEach
         for (int i = 0; i < handlerContexts.size(); i++) {
             HandlerContext handlerContext = handlerContexts.get(i);
 
@@ -497,17 +522,21 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
 
         Long finalCorrelationId = correlationId;
         firstHandlerExecutor.execute(() -> {
-            long startedNanos = longHandlingLoggingEnabled ? System.nanoTime() 
: 0;
+            long startedNanos = System.nanoTime();
 
             try {
                 handleStartingWithFirstHandler(payload, finalCorrelationId, 
inNetworkObject, firstHandlerContext, handlerContexts);
             } catch (Throwable e) {
+                metrics.incrementMessageHandlingFailures();
+
                 handleAndRethrowIfError(inNetworkObject, e);
             } finally {
-                if (longHandlingLoggingEnabled && LOG.isWarnEnabled()) {
-                    long tookMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
+                long tookMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNanos);
+
+                if (tookMillis > 100) {
+                    metrics.incrementSlowResponses();
 
-                    if (tookMillis > 100) {
+                    if (longHandlingLoggingEnabled && LOG.isWarnEnabled()) {
                         LOG.warn(
                                 "Processing of {} from {} took {} ms",
                                 LOG.isDebugEnabled() && includeSensitive() ? 
message : message.toStringForLightLogging(),
@@ -647,7 +676,14 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         TimeoutObjectImpl responseFuture = requestsMap.remove(correlationId);
 
         if (responseFuture != null) {
-            responseFuture.future().complete(response);
+            var fut = responseFuture.future();
+
+            fut.complete(response);
+
+            // Check if it was already completed exceptionally by the timeout 
worker.
+            if (fut.isCompletedExceptionally()) {
+                metrics.incrementInvokeTimeouts();
+            }
         }
     }
 
@@ -698,12 +734,17 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                 recipientInetAddrByNodeId.remove(member.id());
             }
         });
+
+        metricManager.registerSource(metricSource);
+        metricManager.enable(metricSource);
     }
 
     /**
      * Stops the messaging service.
      */
     public void stop() throws Exception {
+        metricManager.unregisterSource(metricSource);
+
         var exception = new NodeStoppingException();
 
         requestsMap.values().forEach(fut -> 
fut.future().completeExceptionally(exception));
@@ -722,6 +763,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     }
 
     // TODO: IGNITE-18493 - remove/move this
+
     /**
      * Installs a predicate, it will be consulted with for each message being 
sent; when it returns {@code true}, the
      * message will be dropped (it will not be sent; the corresponding future 
will time out soon for {@code invoke()} methods
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
new file mode 100644
index 00000000000..2c26389c226
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetricSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Metric source for the {@link MessagingService}.
+ */
+class MessagingServiceMetricSource implements MetricSource {
+    /** Metrics map. Only modified in {@code synchronized} context. */
+    private final Map<String, Metric> metrics = new HashMap<>();
+
+    /** Enabled flag. Only modified in {@code synchronized} context. */
+    private boolean enabled;
+
+    @Override
+    public String name() {
+        return "messaging";
+    }
+
+    @Override
+    public @Nullable String description() {
+        return "Metrics for the messaging service.";
+    }
+
+    /** Adds metric to the source. */
+    synchronized <T extends Metric> T addMetric(T metric) {
+        assert !enabled : "Cannot add metrics when source is enabled";
+
+        metrics.put(metric.name(), metric);
+
+        return metric;
+    }
+
+    @Override
+    public synchronized @Nullable MetricSet enable() {
+        if (enabled) {
+            return null;
+        }
+
+        enabled = true;
+
+        return new MetricSet(name(), description(), group(), 
Map.copyOf(metrics));
+    }
+
+    @Override
+    public synchronized void disable() {
+        enabled = false;
+    }
+
+    @Override
+    public synchronized boolean enabled() {
+        return enabled;
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
new file mode 100644
index 00000000000..3390e7f09b7
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/MessagingServiceMetrics.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network;
+
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+
+class MessagingServiceMetrics {
+    private final AtomicLongMetric messageHandlingFailures;
+
+    private final AtomicLongMetric messageRecipientNotFound;
+
+    private final AtomicLongMetric invokeTimeouts;
+
+    private final AtomicLongMetric slowResponses;
+
+    MessagingServiceMetrics(MessagingServiceMetricSource source) {
+        messageHandlingFailures = source.addMetric(new AtomicLongMetric(
+                "messageHandlingFailures",
+                "Total number of message handling failures."
+        ));
+
+        messageRecipientNotFound = source.addMetric(new AtomicLongMetric(
+                "messageRecipientNotFound",
+                "Total number of message recipient resolution failures."
+        ));
+
+        invokeTimeouts = source.addMetric(new AtomicLongMetric(
+                "invokeTimeouts",
+                "Total number of invocation timeouts."
+        ));
+
+        slowResponses = source.addMetric(new AtomicLongMetric(
+                "slowResponses",
+                "Total number of responses that took long to generate (> 
100ms)."
+        ));
+    }
+
+    void incrementMessageHandlingFailures() {
+        messageHandlingFailures.increment();
+    }
+
+    void incrementMessageRecipientNotFound() {
+        messageRecipientNotFound.increment();
+    }
+
+    void incrementInvokeTimeouts() {
+        invokeTimeouts.increment();
+    }
+
+    void incrementSlowResponses() {
+        slowResponses.increment();
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
index 777a2e9a330..62a4fb881f7 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterService.java
@@ -46,6 +46,7 @@ import 
org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ChannelTypeRegistry;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
@@ -119,7 +120,8 @@ public class ScaleCubeClusterService implements 
ClusterService {
             CriticalWorkerRegistry criticalWorkerRegistry,
             FailureProcessor failureProcessor,
             ChannelTypeRegistry channelTypeRegistry,
-            IgniteProductVersionSource productVersionSource
+            IgniteProductVersionSource productVersionSource,
+            MetricManager metricManager
     ) {
         this.config = networkConfiguration;
         this.serializationRegistry = serializationRegistry;
@@ -167,6 +169,7 @@ public class ScaleCubeClusterService implements 
ClusterService {
                 criticalWorkerRegistry,
                 failureProcessor,
                 connectionMgr,
+                metricManager,
                 channelTypeRegistry
         );
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index fdc8d8550cb..66b37398422 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -64,6 +64,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
 import org.apache.ignite.internal.network.messages.InstantContainer;
@@ -633,6 +634,7 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
                 criticalWorkerRegistry,
                 failureProcessor,
                 connectionManager,
+                new NoOpMetricManager(),
                 channelTypeRegistry
         );
 
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
index 53f15d2bc73..c6aa20df65e 100644
--- 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/scalecube/TestScaleCubeClusterService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.scalecube;
 
 import io.scalecube.cluster.ClusterConfig;
 import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.ChannelTypeRegistry;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
@@ -44,7 +45,8 @@ public class TestScaleCubeClusterService extends 
ScaleCubeClusterService {
             CriticalWorkerRegistry criticalWorkerRegistry,
             FailureProcessor failureProcessor,
             ChannelTypeRegistry channelTypeRegistry,
-            IgniteProductVersionSource productVersionSource
+            IgniteProductVersionSource productVersionSource,
+            MetricManager metricManager
     ) {
         super(
                 consistentId,
@@ -56,7 +58,8 @@ public class TestScaleCubeClusterService extends 
ScaleCubeClusterService {
                 criticalWorkerRegistry,
                 failureProcessor,
                 channelTypeRegistry,
-                productVersionSource
+                productVersionSource,
+                metricManager
         );
     }
 
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
index d8e9625eae9..c68b67a6cca 100644
--- 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
@@ -35,6 +35,7 @@ import 
org.apache.ignite.internal.configuration.validation.TestConfigurationVali
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ChannelTypeRegistry;
 import org.apache.ignite.internal.network.ChannelTypeRegistryProvider;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
@@ -249,7 +250,8 @@ public class ClusterServiceTestUtils {
                 new NoOpCriticalWorkerRegistry(),
                 new FailureManager(new NoOpFailureHandler()),
                 defaultChannelTypeRegistry(),
-                productVersionSource
+                productVersionSource,
+                new NoOpMetricManager()
         ) {
             @Override
             public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index c2fbbe328b4..e52c334c783 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -198,7 +199,8 @@ public class ItTruncateSuffixAndRestartTest extends 
BaseIgniteAbstractTest {
                     new NoOpCriticalWorkerRegistry(),
                     mock(FailureManager.class),
                     defaultChannelTypeRegistry(),
-                    new DefaultIgniteProductVersionSource()
+                    new DefaultIgniteProductVersionSource(),
+                    new NoOpMetricManager()
             );
 
             assertThat(clusterSvc.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index c73981c88f0..6f2ee5385ac 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -23,6 +23,7 @@ import static io.micronaut.http.HttpRequest.POST;
 import static io.micronaut.http.HttpStatus.NOT_FOUND;
 import static io.micronaut.http.MediaType.TEXT_PLAIN_TYPE;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
 import static 
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.assertThrowsProblem;
 import static 
org.apache.ignite.internal.rest.matcher.MicronautHttpResponseMatcher.hasStatus;
 import static org.apache.ignite.internal.rest.matcher.ProblemMatcher.isProblem;
@@ -72,7 +73,12 @@ class ItMetricControllerTest extends 
ClusterPerClassIntegrationTest {
             new MetricSource("placement-driver", true),
             new MetricSource("clock.service", true),
             new MetricSource("index.builder", true),
-            new MetricSource("raft.snapshots", true)
+            new MetricSource("raft.snapshots", true),
+            new MetricSource("messaging", true),
+            new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.default", true),
+            new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.deploymentunits", true),
+            new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.scalecube", true),
+            new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".messaging.outbound", true),
     };
 
     @Inject
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 739b7f2b0df..35fae54f048 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -407,7 +407,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 workerRegistry,
                 failureProcessor,
                 defaultChannelTypeRegistry(),
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                new NoOpMetricManager()
         );
 
         var hybridClock = new HybridClockImpl();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 24b761491b2..fdfbc0440f7 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -640,7 +640,8 @@ public class IgniteImpl implements Ignite {
                 criticalWorkerRegistry,
                 failureManager,
                 
ChannelTypeRegistryProvider.loadByServiceLoader(serviceProviderClassLoader),
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                metricManager
         );
 
         clock = new HybridClockImpl(failureManager);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
index 17fa2df3744..dad359ece50 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/QueryTaskExecutorImpl.java
@@ -88,7 +88,7 @@ public class QueryTaskExecutorImpl implements 
QueryTaskExecutor {
                 0
         );
 
-        metricManager.registerSource(new 
StripedThreadPoolMetricSource(QUERY_EXECUTOR_SOURCE_NAME, null, 
stripedThreadPoolExecutor));
+        metricManager.registerSource(new 
StripedThreadPoolMetricSource<>(QUERY_EXECUTOR_SOURCE_NAME, null, 
stripedThreadPoolExecutor));
         metricManager.enable(QUERY_EXECUTOR_SOURCE_NAME);
     }
 
diff --git a/modules/workers/build.gradle b/modules/workers/build.gradle
index f16c09e0def..2bbd7c585cc 100644
--- a/modules/workers/build.gradle
+++ b/modules/workers/build.gradle
@@ -30,6 +30,7 @@ dependencies {
     implementation project(':ignite-configuration-api')
     implementation project(':ignite-configuration-root')
     implementation project(':ignite-failure-handler')
+    implementation project(':ignite-metrics')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.auto.service.annotations
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
index 531c61642c1..de62961f5cb 100644
--- 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalSingleThreadExecutor.java
@@ -19,11 +19,17 @@ package org.apache.ignite.internal.worker;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSource;
+import org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Single thread executor instrumented to be used as a {@link CriticalWorker} 
and being monitored by the {@link CriticalWorkerWatchdog}.
@@ -34,6 +40,9 @@ public class CriticalSingleThreadExecutor extends 
ThreadPoolExecutor implements
     private volatile Thread lastSeenThread;
     private volatile long heartbeatNanos = NOT_MONITORED;
 
+    private @Nullable MetricSource metricSource;
+    private @Nullable MetricManager metricManager;
+
     /** Constructor. */
     public CriticalSingleThreadExecutor(ThreadFactory threadFactory) {
         this(0, SECONDS, new LinkedBlockingQueue<>(), threadFactory);
@@ -44,6 +53,24 @@ public class CriticalSingleThreadExecutor extends 
ThreadPoolExecutor implements
         super(1, 1, keepAliveTime, unit, workQueue, threadFactory);
     }
 
+    /**
+     * Initialize the metric source to track this thread pool's metrics.
+     *
+     * @param metricManager The metric manager used to register the source.
+     * @param name The name of the metric.
+     * @param description The metric description.
+     */
+    public void initMetricSource(MetricManager metricManager, String name, 
String description) {
+        if (this.metricManager == null) {
+            this.metricManager = metricManager;
+
+            metricSource = new ThreadPoolMetricSource(name, description, null, 
this);
+
+            metricManager.registerSource(metricSource);
+            metricManager.enable(metricSource);
+        }
+    }
+
     @Override
     protected void beforeExecute(Thread t, Runnable r) {
         lastSeenThread = t;
@@ -74,4 +101,26 @@ public class CriticalSingleThreadExecutor extends 
ThreadPoolExecutor implements
     public long heartbeatNanos() {
         return heartbeatNanos;
     }
+
+    @Override
+    public void shutdown() {
+        if (metricManager != null) {
+            assert metricSource != null;
+
+            metricManager.unregisterSource(metricSource);
+        }
+
+        super.shutdown();
+    }
+
+    @Override
+    public @NotNull List<Runnable> shutdownNow() {
+        if (metricManager != null) {
+            assert metricSource != null;
+
+            metricManager.unregisterSource(metricSource);
+        }
+
+        return super.shutdownNow();
+    }
 }
diff --git 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
index 107595b5e27..5ed192d27b6 100644
--- 
a/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
+++ 
b/modules/workers/src/main/java/org/apache/ignite/internal/worker/CriticalStripedThreadPoolExecutor.java
@@ -26,9 +26,14 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.MetricSource;
+import 
org.apache.ignite.internal.metrics.sources.StripedThreadPoolMetricSource;
 import org.apache.ignite.internal.thread.AbstractStripedThreadPoolExecutor;
 import org.apache.ignite.internal.thread.StripedExecutor;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Same as {@link StripedThreadPoolExecutor}, but each stripe is a critical 
worker monitored for being blocked.
@@ -36,6 +41,12 @@ import 
org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 public class CriticalStripedThreadPoolExecutor extends 
AbstractStripedThreadPoolExecutor<ExecutorService> implements StripedExecutor {
     private final List<CriticalWorker> workers;
 
+    @Nullable
+    private MetricManager metricManager;
+
+    @Nullable
+    private MetricSource metricSource;
+
     /**
      * Create a blockage-monitored striped thread pool.
      *
@@ -93,4 +104,44 @@ public class CriticalStripedThreadPoolExecutor extends 
AbstractStripedThreadPool
     public Collection<CriticalWorker> workers() {
         return workers;
     }
+
+    /**
+     * Initialize the metric source to track this thread pool's metrics.
+     *
+     * @param metricManager The metric manager used to register the source.
+     * @param name The name of the metric.
+     * @param description The metric description.
+     */
+    public void initMetricSource(MetricManager metricManager, String name, 
String description) {
+        if (this.metricManager == null) {
+            this.metricManager = metricManager;
+
+            metricSource = new StripedThreadPoolMetricSource<>(name, 
description, null, this);
+
+            metricManager.registerSource(metricSource);
+            metricManager.enable(metricSource);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (metricManager != null) {
+            assert metricSource != null;
+
+            metricManager.unregisterSource(metricSource);
+        }
+
+        super.shutdown();
+    }
+
+    @Override
+    public @NotNull List<Runnable> shutdownNow() {
+        if (metricManager != null) {
+            assert metricSource != null;
+
+            metricManager.unregisterSource(metricSource);
+        }
+
+        return super.shutdownNow();
+    }
 }

Reply via email to