This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new aab140ff2aa IGNITE-27150 Add raft snapshot metrics (#7269)
aab140ff2aa is described below

commit aab140ff2aa31083150fbb6cdeed79bc22069639
Author: Ivan Zlenko <[email protected]>
AuthorDate: Tue Dec 23 11:41:29 2025 +0500

    IGNITE-27150 Add raft snapshot metrics (#7269)
---
 .../administrators-guide/metrics/metrics-list.adoc |  18 ++
 .../ignite/internal/cli/CliIntegrationTest.java    |   3 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   3 +-
 modules/partition-replicator/build.gradle          |   3 +
 .../partition/replicator/fixtures/Node.java        |   3 +-
 .../ItZonePartitionRaftListenerRecoveryTest.java   |   4 +-
 .../PartitionReplicaLifecycleManager.java          |  19 +-
 .../partition/replicator/ZoneResourcesManager.java |  10 +-
 .../raft/snapshot/PartitionSnapshotStorage.java    |  25 +-
 .../snapshot/incoming/IncomingSnapshotCopier.java  |  23 +-
 .../metrics/RaftSnapshotsMetricsSource.java        | 280 +++++++++++++++++++++
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |   9 +-
 .../snapshot/outgoing/OutgoingSnapshotReader.java  |  10 +-
 .../outgoing/OutgoingSnapshotsManager.java         |   2 +
 .../PartitionReplicaLifecycleManagerTest.java      |   4 +-
 .../PartitionSnapshotStorageFactoryTest.java       |   4 +-
 .../snapshot/PartitionSnapshotStorageTest.java     |   4 +-
 .../incoming/IncomingSnapshotCopierTest.java       | 151 ++++++++++-
 .../metrics/RaftSnapshotsMetricsSourceTest.java    |  65 +++++
 .../outgoing/OutgoingSnapshotCommonTest.java       |  54 +++-
 .../OutgoingSnapshotMvDataStreamingTest.java       |  15 +-
 .../outgoing/OutgoingSnapshotReaderTest.java       |  14 +-
 .../OutgoingSnapshotTxDataStreamingTest.java       |   8 +-
 .../outgoing/OutgoingSnapshotsManagerTest.java     |   8 +-
 .../rest/metrics/ItMetricControllerTest.java       |   3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../distributed/TableManagerRecoveryTest.java      |  13 +-
 28 files changed, 718 insertions(+), 43 deletions(-)

diff --git a/docs/_docs/administrators-guide/metrics/metrics-list.adoc 
b/docs/_docs/administrators-guide/metrics/metrics-list.adoc
index abe2bbe1767..ffacf503684 100644
--- a/docs/_docs/administrators-guide/metrics/metrics-list.adoc
+++ b/docs/_docs/administrators-guide/metrics/metrics-list.adoc
@@ -296,3 +296,21 @@ Transaction metrics.
 | LocalUnrebalancedPartitionsCount | The number of partitions that should be 
moved to this node.
 | TotalUnrebalancedPartitionsCount | The total number of partitions that 
should be moved to a new owner.
 |=======================================================================
+
+== raft.snapshots
+
+Metrics related to Raft snapshots of partition replicas.
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+
+| IncomingSnapshots | The number of incoming Raft snapshots in progress.
+| IncomingSnapshotsLoadingMeta | The number of incoming Raft snapshots loading 
metadata.
+| IncomingSnapshotsWaitingCatalog | The number of incoming Raft snapshots 
waiting for catalog.
+| IncomingSnapshotsPreparingStorages | The number of incoming Raft snapshots 
preparing storages.
+| IncomingSnapshotsPreparingIndexForBuild | The number of incoming Raft 
snapshots preparing indexes for build.
+| IncomingSnapshotsLoadingMvData | The number of incoming Raft snapshots 
loading multi-versioned data.
+| IncomingSnapshotsLoadingTxMeta | The number of incoming Raft snapshots 
loading transaction metadata.
+| OutgoingSnapshots | The number of outgoing Raft snapshots in progress.
+|=======================================================================
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 7f246c67b79..7fb227df5e3 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -82,7 +82,8 @@ public abstract class CliIntegrationTest extends 
ClusterPerClassIntegrationTest
             new MetricSource().name("placement-driver").enabled(true),
             new MetricSource().name("resource.vacuum").enabled(true),
             new MetricSource().name("clock.service").enabled(true),
-            new MetricSource().name("index.builder").enabled(true)
+            new MetricSource().name("index.builder").enabled(true),
+            new MetricSource().name("raft.snapshots").enabled(true)
     };
 
     /** Correct ignite jdbc url. */
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 860617f9185..06feac04799 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1556,7 +1556,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     txManager,
                     schemaManager,
                     dataStorageMgr,
-                    outgoingSnapshotManager
+                    outgoingSnapshotManager,
+                    metricManager
             );
 
             tableManager = new TableManager(
diff --git a/modules/partition-replicator/build.gradle 
b/modules/partition-replicator/build.gradle
index 896a6cbe843..8398351ed41 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -52,6 +52,7 @@ dependencies {
     testImplementation testFixtures(project(':ignite-core'))
     testImplementation testFixtures(project(':ignite-low-watermark'))
     testImplementation testFixtures(project(':ignite-metastorage'))
+    testImplementation testFixtures(project(':ignite-metrics'))
     testImplementation testFixtures(project(':ignite-schema'))
     testImplementation testFixtures(project(':ignite-storage-api'))
     testImplementation testFixtures(project(':ignite-table'))
@@ -60,6 +61,8 @@ dependencies {
     testImplementation testFixtures(project(':ignite-raft'))
     testImplementation testFixtures(project(':ignite-distribution-zones'))
 
+    testImplementation libs.awaitility
+
     integrationTestImplementation 
testFixtures(project(':ignite-cluster-management'))
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index d18eae051cd..d2596ffc661 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -744,7 +744,8 @@ public class Node {
                 txManager,
                 schemaManager,
                 dataStorageMgr,
-                outgoingSnapshotsManager
+                outgoingSnapshotsManager,
+                metricManager
         );
 
         resourceVacuumManager = new ResourceVacuumManager(
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index 5c7621af9e5..1aa7266a88d 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -83,6 +83,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSn
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
@@ -311,7 +312,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends 
IgniteAbstractTest {
                 catalogService,
                 failureProcessor,
                 executor,
-                new LogStorageAccessImpl(replicaManager)
+                new LogStorageAccessImpl(replicaManager),
+                new RaftSnapshotsMetricsSource()
         );
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index a423ffea41e..d96825663c8 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -127,6 +127,7 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metastorage.dsl.Condition;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.network.TopologyService;
@@ -265,6 +266,8 @@ public class PartitionReplicaLifecycleManager extends
 
     private final ZoneResourcesManager zoneResourcesManager;
 
+    private final MetricManager metricManager;
+
     private final ReliableCatalogVersions reliableCatalogVersions;
 
     private final EventListener<CreateZoneEventParameters> 
onCreateZoneListener = this::onCreateZone;
@@ -295,6 +298,7 @@ public class PartitionReplicaLifecycleManager extends
      * @param schemaManager Schema manager.
      * @param dataStorageManager Data storage manager.
      * @param outgoingSnapshotsManager Outgoing snapshots manager.
+     * @param metricManager Metric manager.
      */
     public PartitionReplicaLifecycleManager(
             CatalogService catalogService,
@@ -316,7 +320,8 @@ public class PartitionReplicaLifecycleManager extends
             TxManager txManager,
             SchemaManager schemaManager,
             DataStorageManager dataStorageManager,
-            OutgoingSnapshotsManager outgoingSnapshotsManager
+            OutgoingSnapshotsManager outgoingSnapshotsManager,
+            MetricManager metricManager
     ) {
         this(
                 catalogService,
@@ -346,7 +351,8 @@ public class PartitionReplicaLifecycleManager extends
                         failureProcessor,
                         partitionOperationsExecutor,
                         replicaMgr
-                )
+                ),
+                metricManager
         );
     }
 
@@ -370,7 +376,8 @@ public class PartitionReplicaLifecycleManager extends
             TxManager txManager,
             SchemaManager schemaManager,
             DataStorageManager dataStorageManager,
-            ZoneResourcesManager zoneResourcesManager
+            ZoneResourcesManager zoneResourcesManager,
+            MetricManager metricManager
     ) {
         this.catalogService = catalogService;
         this.replicaMgr = replicaMgr;
@@ -390,6 +397,7 @@ public class PartitionReplicaLifecycleManager extends
         this.schemaManager = schemaManager;
         this.dataStorageManager = dataStorageManager;
         this.zoneResourcesManager = zoneResourcesManager;
+        this.metricManager = metricManager;
 
         rebalanceRetryDelayConfiguration = new 
SystemDistributedConfigurationPropertyHolder<>(
                 systemDistributedConfiguration,
@@ -431,6 +439,9 @@ public class PartitionReplicaLifecycleManager extends
 
         
executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
 onPrimaryReplicaExpiredListener);
 
+        
metricManager.registerSource(zoneResourcesManager.snapshotsMetricsSource());
+        metricManager.enable(zoneResourcesManager.snapshotsMetricsSource());
+
         return processZonesAndAssignmentsOnStart;
     }
 
@@ -1657,6 +1668,8 @@ public class PartitionReplicaLifecycleManager extends
         }
 
         try {
+            
metricManager.unregisterSource(zoneResourcesManager.snapshotsMetricsSource());
+
             IgniteUtils.closeAllManually(zoneResourcesManager);
         } catch (Exception e) {
             return failedFuture(e);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 51c8efe2cea..969573e705e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageA
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -70,6 +71,8 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
 
     private final ReplicaManager replicaManager;
 
+    private final RaftSnapshotsMetricsSource snapshotsMetricsSource = new 
RaftSnapshotsMetricsSource();
+
     /** Map from zone IDs to their resource holders. */
     private final Map<Integer, ZoneResources> resourcesByZoneId = new 
ConcurrentHashMap<>();
 
@@ -128,7 +131,8 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
                 catalogService,
                 failureProcessor,
                 partitionOperationsExecutor,
-                new LogStorageAccessImpl(replicaManager)
+                new LogStorageAccessImpl(replicaManager),
+                snapshotsMetricsSource
         );
 
         var zonePartitionResources = new ZonePartitionResources(
@@ -218,6 +222,10 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
         return resources.txStateStorage.getPartitionStorage(partitionId);
     }
 
+    RaftSnapshotsMetricsSource snapshotsMetricsSource() {
+        return snapshotsMetricsSource;
+    }
+
     private static class ZoneResources {
 
         final TxStateStorage txStateStorage;
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index 6cf34dde4a1..705c598a1ef 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotReader;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -100,6 +101,8 @@ public class PartitionSnapshotStorage {
 
     private final LogStorageAccess logStorage;
 
+    private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
     /** Constructor. */
     public PartitionSnapshotStorage(
             ZonePartitionKey partitionKey,
@@ -109,7 +112,8 @@ public class PartitionSnapshotStorage {
             CatalogService catalogService,
             FailureProcessor failureProcessor,
             Executor incomingSnapshotsExecutor,
-            LogStorageAccess logStorage
+            LogStorageAccess logStorage,
+            RaftSnapshotsMetricsSource snapshotsMetricsSource
     ) {
         this(
                 partitionKey,
@@ -120,7 +124,8 @@ public class PartitionSnapshotStorage {
                 failureProcessor,
                 incomingSnapshotsExecutor,
                 DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS,
-                logStorage
+                logStorage,
+                snapshotsMetricsSource
         );
     }
 
@@ -134,7 +139,8 @@ public class PartitionSnapshotStorage {
             FailureProcessor failureProcessor,
             Executor incomingSnapshotsExecutor,
             long waitForMetadataCatchupMs,
-            LogStorageAccess logStorage
+            LogStorageAccess logStorage,
+            RaftSnapshotsMetricsSource snapshotsMetricsSource
     ) {
         this.partitionKey = partitionKey;
         this.topologyService = topologyService;
@@ -145,6 +151,7 @@ public class PartitionSnapshotStorage {
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
         this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
         this.logStorage = logStorage;
+        this.snapshotsMetricsSource = snapshotsMetricsSource;
     }
 
     public ZonePartitionKey partitionKey() {
@@ -231,7 +238,13 @@ public class PartitionSnapshotStorage {
 
         SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri);
 
-        var copier = new IncomingSnapshotCopier(this, snapshotUri, 
incomingSnapshotsExecutor, waitForMetadataCatchupMs) {
+        var copier = new IncomingSnapshotCopier(
+                this,
+                snapshotUri,
+                incomingSnapshotsExecutor,
+                waitForMetadataCatchupMs,
+                snapshotsMetricsSource
+        ) {
             @Override
             public void close() {
                 try {
@@ -257,7 +270,7 @@ public class PartitionSnapshotStorage {
 
         startSnapshotOperation(snapshotId);
 
-        return new OutgoingSnapshotReader(snapshotId, this) {
+        return new OutgoingSnapshotReader(snapshotId, this, 
snapshotsMetricsSource) {
             @Override
             public void close() throws IOException {
                 try {
@@ -282,6 +295,8 @@ public class PartitionSnapshotStorage {
         LOG.info("Finishing outgoing snapshot [partitionKey={}, 
snapshotId={}]", partitionKey, snapshotId);
 
         synchronized (snapshotOperationLock) {
+            LOG.info("Finishing outgoing snapshot [partitionKey={}, 
snapshotId={}]", partitionKey, snapshotId);
+
             CompletableFuture<Void> operationFuture = 
ongoingSnapshotOperations.remove(snapshotId);
 
             assert operationFuture != null :
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 50bc86dbc4f..f9ffb208f04 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMv
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -137,6 +138,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private final IncomingSnapshotStats snapshotStats = new 
IncomingSnapshotStats();
 
+    private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
     /**
      * Constructor.
      *
@@ -145,23 +148,27 @@ public class IncomingSnapshotCopier extends 
SnapshotCopier {
      * @param executor Thread pool for IO operations.
      * @param waitForMetadataCatchupMs How much time to allow for metadata on 
this node to reach the catalog version required by an
      *         incoming snapshot.
+     * @param snapshotsMetricsSource Raft snapshots metrics source.
      */
     public IncomingSnapshotCopier(
             PartitionSnapshotStorage partitionSnapshotStorage,
             SnapshotUri snapshotUri,
             Executor executor,
-            long waitForMetadataCatchupMs
+            long waitForMetadataCatchupMs,
+            RaftSnapshotsMetricsSource snapshotsMetricsSource
     ) {
         this.partitionSnapshotStorage = partitionSnapshotStorage;
         this.snapshotUri = snapshotUri;
         this.executor = executor;
         this.throttledLogger = Loggers.toThrottledLogger(LOG, executor);
         this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
+        this.snapshotsMetricsSource = snapshotsMetricsSource;
     }
 
     @Override
     public void start() {
         snapshotStats.onSnapshotInstallationStart();
+        snapshotsMetricsSource.onSnapshotInstallationStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Rebalance is started [snapshotId={}, {}]", 
snapshotUri.snapshotId, createPartitionInfo());
@@ -226,6 +233,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private CompletableFuture<?> 
waitForMetadataWithTimeout(PartitionSnapshotMeta snapshotMeta) {
         snapshotStats.onWaitingCatalogPhaseStart();
+        snapshotsMetricsSource.onWaitingCatalogPhaseStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info(
@@ -243,6 +251,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         return anyOf(metadataReadyFuture, readinessTimeoutFuture)
                 .whenComplete((ignored, throwable) -> {
                     snapshotStats.onWaitingCatalogPhaseEnd();
+                    snapshotsMetricsSource.onWaitingCatalogPhaseEnd();
 
                     if (LOG.isInfoEnabled()) {
                         LOG.info(
@@ -351,6 +360,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         }
 
         snapshotStats.onLoadSnapshotPhaseStart();
+        snapshotsMetricsSource.onLoadSnapshotMetaPhaseStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Start loading snapshot meta [snapshotId={}, {}]", 
snapshotUri.snapshotId, createPartitionInfo());
@@ -365,6 +375,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
                 PartitionSnapshotMeta snapshotMeta = ((SnapshotMetaResponse) 
response).meta();
 
                 snapshotStats.onLoadSnapshotPhaseEnd();
+                snapshotsMetricsSource.onLoadSnapshotMetaPhaseEnd();
 
                 if (LOG.isInfoEnabled()) {
                     LOG.info(
@@ -414,6 +425,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         }
 
         snapshotStats.onLoadMvDataPhaseStart();
+        snapshotsMetricsSource.onLoadMvDataPhaseStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info(
@@ -453,6 +465,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
                 if (snapshotMvDataResponse.finish()) {
                     snapshotStats.onLoadMvDataPhaseEnd();
+                    snapshotsMetricsSource.onLoadMvDataPhaseEnd();
 
                     if (LOG.isInfoEnabled()) {
                         LOG.info(
@@ -496,6 +509,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         }
 
         snapshotStats.onLoadTxMetasPhaseStart();
+        snapshotsMetricsSource.onLoadTxMetasPhaseStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info(
@@ -537,6 +551,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
                 if (snapshotTxDataResponse.finish()) {
                     snapshotStats.onLoadTxMetasPhaseEnd();
+                    snapshotsMetricsSource.onLoadTxMetasPhaseEnd();
 
                     if (LOG.isInfoEnabled()) {
                         LOG.info(
@@ -578,7 +593,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
      *         successful.
      */
     private CompletableFuture<Void> completeRebalance(SnapshotContext 
snapshotContext, @Nullable Throwable throwable) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-27428
         snapshotStats.onSnapshotInstallationEnd();
+        snapshotsMetricsSource.onSnapshotInstallationEnd();
 
         if (!busyLock.enterBusy()) {
             if (isOk()) {
@@ -730,6 +747,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         }
 
         snapshotStats.onSetRowIdToBuildPhaseStart();
+        snapshotsMetricsSource.onSetRowIdToBuildPhaseStart();
 
         try {
             Map<Integer, UUID> nextRowUuidToBuildByIndexId = 
snapshotContext.meta.nextRowIdToBuildByIndexId();
@@ -772,6 +790,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
             }
 
             snapshotStats.onSetRowIdToBuildPhaseEnd();
+            snapshotsMetricsSource.onSetRowIdToBuildPhaseEnd();
 
             if (LOG.isInfoEnabled()) {
                 LOG.info("Finished setting next row ID for index building 
[snapshotId={}, {}, totalTime={}ms]",
@@ -817,6 +836,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
         }
 
         snapshotStats.onPreparingStoragePhaseStart();
+        snapshotsMetricsSource.onPreparingStoragePhaseStart();
 
         if (LOG.isInfoEnabled()) {
             LOG.info(
@@ -833,6 +853,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
             ).thenComposeAsync(unused -> 
startRebalanceForReplicationLogStorages(snapshotContext), executor)
                     .whenComplete((ignore, throwable) -> {
                         snapshotStats.onPreparingStoragePhaseEnd();
+                        snapshotsMetricsSource.onPreparingStoragePhaseEnd();
 
                         if (LOG.isInfoEnabled()) {
                             LOG.info(
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
new file mode 100644
index 00000000000..db2ee6de014
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.IntGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder;
+
+/**
+ * Metric source that exposes counters related to Raft snapshots lifecycle for 
partition replicator.
+ *
+ * <p>The source is registered under the name {@code raft.snapshots}. It 
maintains the number of currently
+ * running incoming and outgoing snapshots and per-phase counters for the 
installation of incoming snapshots.
+ * These counters are intended to help understand where time is spent during 
snapshot installation and
+ * whether there are any bottlenecks (for example, waiting for catalog, 
loading multi-versioned data, etc.).
+ */
+public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> {
+    private final AtomicInteger incomingSnapshotsCounter = new AtomicInteger();
+
+    private final AtomicInteger snapshotsLoadingMetaCounter = new 
AtomicInteger();
+
+    private final AtomicInteger snapshotsWaitingCatalogCounter = new 
AtomicInteger();
+
+    private final AtomicInteger snapshotsPreparingStoragesCounter = new 
AtomicInteger();
+
+    private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new 
AtomicInteger();
+
+    private final AtomicInteger snapshotsLoadingMvDataCounter = new 
AtomicInteger();
+
+    private final AtomicInteger snapshotsLoadingTxMetaCounter = new 
AtomicInteger();
+
+    private final AtomicInteger outgoingSnapshotsCounter = new AtomicInteger();
+
+    /**
+     * Creates a new metric source with the name {@code raft.snapshots}.
+     */
+    public RaftSnapshotsMetricsSource() {
+        super("raft.snapshots");
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder(
+                incomingSnapshotsCounter::get,
+                snapshotsLoadingMetaCounter::get,
+                snapshotsWaitingCatalogCounter::get,
+                snapshotsPreparingStoragesCounter::get,
+                snapshotsPreparingIndexForBuildCounter::get,
+                snapshotsLoadingMvDataCounter::get,
+                snapshotsLoadingTxMetaCounter::get,
+                outgoingSnapshotsCounter::get
+        );
+    }
+
+    /**
+     * Marks the start of an incoming snapshot installation.
+     */
+    public void onSnapshotInstallationStart() {
+        incomingSnapshotsCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of an incoming snapshot installation.
+     */
+    public void onSnapshotInstallationEnd() {
+        incomingSnapshotsCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of the "load snapshot metadata" phase during 
incoming snapshot installation.
+     */
+    public void onLoadSnapshotMetaPhaseStart() {
+        snapshotsLoadingMetaCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of the "load snapshot metadata" phase during incoming 
snapshot installation.
+     */
+    public void onLoadSnapshotMetaPhaseEnd() {
+        snapshotsLoadingMetaCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of the phase where the node waits for catalog to be 
ready/apply updates
+     * for the incoming snapshot installation.
+     */
+    public void onWaitingCatalogPhaseStart() {
+        snapshotsWaitingCatalogCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of the "waiting for catalog" phase during incoming 
snapshot installation.
+     */
+    public void onWaitingCatalogPhaseEnd() {
+        snapshotsWaitingCatalogCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of the storage preparation phase for incoming 
snapshot installation.
+     */
+    public void onPreparingStoragePhaseStart() {
+        snapshotsPreparingStoragesCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of the storage preparation phase for incoming snapshot 
installation.
+     */
+    public void onPreparingStoragePhaseEnd() {
+        snapshotsPreparingStoragesCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of the phase where MV (multi-version) data is 
loaded.
+     */
+    public void onLoadMvDataPhaseStart() {
+        snapshotsLoadingMvDataCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of the phase where MV (multi-version) data is loaded.
+     */
+    public void onLoadMvDataPhaseEnd() {
+        snapshotsLoadingMvDataCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of the phase where transaction metadata is loaded 
from the snapshot.
+     */
+    public void onLoadTxMetasPhaseStart() {
+        snapshotsLoadingTxMetaCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of the phase where transaction metadata is loaded from 
the snapshot.
+     */
+    public void onLoadTxMetasPhaseEnd() {
+        snapshotsLoadingTxMetaCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the beginning of preparing indexes for build as part of incoming 
snapshot installation.
+     */
+    public void onSetRowIdToBuildPhaseStart() {
+        snapshotsPreparingIndexForBuildCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of preparing indexes for build as part of incoming 
snapshot installation.
+     */
+    public void onSetRowIdToBuildPhaseEnd() {
+        snapshotsPreparingIndexForBuildCounter.decrementAndGet();
+    }
+
+    /**
+     * Marks the start of an outgoing snapshot creation/streaming.
+     */
+    public void onOutgoingSnapshotStart() {
+        outgoingSnapshotsCounter.incrementAndGet();
+    }
+
+    /**
+     * Marks the end of an outgoing snapshot creation/streaming.
+     */
+    public void onOutgoingSnapshotEnd() {
+        outgoingSnapshotsCounter.decrementAndGet();
+    }
+
+    /**
+     * Container of metrics exposed by {@link RaftSnapshotsMetricsSource}.
+     */
+    public static class Holder implements AbstractMetricSource.Holder<Holder> {
+        private final IntGauge incomingSnapshots;
+
+        private final IntGauge snapshotsLoadingMeta;
+
+        private final IntGauge snapshotsWaitingCatalog;
+
+        private final IntGauge snapshotsPreparingStorages;
+
+        private final IntGauge snapshotsPreparingIndexForBuild;
+
+        private final IntGauge snapshotsLoadingMvData;
+
+        private final IntGauge snapshotsLoadingTxMeta;
+
+        private final IntGauge outgoingSnapshots;
+
+        private Holder(
+                IntSupplier incomingSnapshotsSupplier,
+                IntSupplier snapshotsLoadingMetaSupplier,
+                IntSupplier snapshotsWaitingCatalogSupplier,
+                IntSupplier snapshotsPreparingStoragesSupplier,
+                IntSupplier snapshotsPreparingIndexForBuildSupplier,
+                IntSupplier snapshotsLoadingMvDataSupplier,
+                IntSupplier snapshotsLoadingTxMetaSupplier,
+                IntSupplier outgoingSnapshotsSupplier
+        ) {
+            incomingSnapshots = new IntGauge(
+                    "IncomingSnapshots",
+                    "Incoming Raft snapshots in progress",
+                    incomingSnapshotsSupplier
+            );
+
+            snapshotsLoadingMeta = new IntGauge(
+                    "IncomingSnapshotsLoadingMeta",
+                    "Incoming Raft snapshots loading metadata",
+                    snapshotsLoadingMetaSupplier
+            );
+
+            snapshotsWaitingCatalog = new IntGauge(
+                    "IncomingSnapshotsWaitingCatalog",
+                    "Incoming Raft snapshots waiting for catalog",
+                    snapshotsWaitingCatalogSupplier
+            );
+
+            snapshotsPreparingStorages = new IntGauge(
+                    "IncomingSnapshotsPreparingStorages",
+                    "Incoming Raft snapshots preparing storages",
+                    snapshotsPreparingStoragesSupplier
+            );
+
+            snapshotsPreparingIndexForBuild = new IntGauge(
+                    "IncomingSnapshotsPreparingIndexForBuild",
+                    "Incoming Raft snapshots preparing indexes for build",
+                    snapshotsPreparingIndexForBuildSupplier
+            );
+
+            snapshotsLoadingMvData = new IntGauge(
+                    "IncomingSnapshotsLoadingMvData",
+                    "Incoming Raft snapshots loading multi-versioned data",
+                    snapshotsLoadingMvDataSupplier
+            );
+
+            snapshotsLoadingTxMeta = new IntGauge(
+                    "IncomingSnapshotsLoadingTxMeta",
+                    "Incoming Raft snapshots loading transaction metadata",
+                    snapshotsLoadingTxMetaSupplier
+            );
+
+            outgoingSnapshots = new IntGauge(
+                    "OutgoingSnapshots",
+                    "Outgoing Raft snapshots in progress",
+                    outgoingSnapshotsSupplier
+            );
+        }
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return List.of(
+                    incomingSnapshots,
+                    snapshotsLoadingMeta,
+                    snapshotsWaitingCatalog,
+                    snapshotsPreparingStorages,
+                    snapshotsPreparingIndexForBuild,
+                    snapshotsLoadingMvData,
+                    snapshotsLoadingTxMeta,
+                    outgoingSnapshots
+            );
+        }
+    }
+}
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index 026c6066822..2b82930b213 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -145,6 +146,8 @@ public class OutgoingSnapshot {
 
     private final OutgoingSnapshotStats snapshotStats;
 
+    private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
     /**
      * Creates a new instance.
      */
@@ -153,7 +156,8 @@ public class OutgoingSnapshot {
             PartitionKey partitionKey,
             Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId,
             PartitionTxStateAccess txState,
-            CatalogService catalogService
+            CatalogService catalogService,
+            RaftSnapshotsMetricsSource snapshotMetricsSource
     ) {
         this.id = id;
         this.partitionKey = partitionKey;
@@ -161,6 +165,7 @@ public class OutgoingSnapshot {
         this.txState = txState;
         this.catalogService = catalogService;
         this.snapshotStats = new OutgoingSnapshotStats(id, partitionKey);
+        this.snapshotsMetricsSource = snapshotMetricsSource;
     }
 
     /**
@@ -187,6 +192,7 @@ public class OutgoingSnapshot {
 
         try {
             snapshotStats.onSnapshotStart();
+            snapshotsMetricsSource.onOutgoingSnapshotStart();
 
             int catalogVersion = catalogService.latestCatalogVersion();
 
@@ -662,6 +668,7 @@ public class OutgoingSnapshot {
         busyLock.block();
 
         snapshotStats.onSnapshotEnd();
+        snapshotsMetricsSource.onOutgoingSnapshotEnd();
 
         snapshotStats.logSnapshotStats();
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index 79b30bb0130..bee32530b0a 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -45,7 +46,11 @@ public class OutgoingSnapshotReader extends SnapshotReader {
     /**
      * Constructor.
      */
-    public OutgoingSnapshotReader(UUID snapshotId, PartitionSnapshotStorage 
snapshotStorage) {
+    public OutgoingSnapshotReader(
+            UUID snapshotId,
+            PartitionSnapshotStorage snapshotStorage,
+            RaftSnapshotsMetricsSource snapshotMetricsSource
+    ) {
         this.snapshotStorage = snapshotStorage;
 
         id = snapshotId;
@@ -55,7 +60,8 @@ public class OutgoingSnapshotReader extends SnapshotReader {
                 snapshotStorage.partitionKey(),
                 snapshotStorage.partitionsByTableId(),
                 snapshotStorage.txState(),
-                snapshotStorage.catalogService()
+                snapshotStorage.catalogService(),
+                snapshotMetricsSource
         );
 
         LOG.info("Starting snapshot reader [{}, snapshotId={}]", 
createPartitionInfo(), id);
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index 313bd6a5726..fde77ca77bd 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -133,6 +133,8 @@ public class OutgoingSnapshotsManager implements 
PartitionsSnapshots, IgniteComp
      * @param outgoingSnapshot Outgoing snapshot.
      */
     void startOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot 
outgoingSnapshot) {
+        LOG.info("Starting outgoing snapshot [snapshotId={}]", snapshotId);
+
         snapshots.put(snapshotId, outgoingSnapshot);
 
         PartitionSnapshotsImpl partitionSnapshots = 
getPartitionSnapshots(outgoingSnapshot.partitionKey());
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index d892ee16fda..83b81368122 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.TopologyService;
@@ -282,7 +283,8 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 txManager,
                 schemaManager,
                 dataStorageManager,
-                zoneResourcesManager
+                zoneResourcesManager,
+                new NoOpMetricManager()
         );
 
         var componentContext = new ComponentContext();
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 6373adc63af..4a95b5157b9 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.startup.StartupPartitionSnapshotReader;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -50,7 +51,8 @@ class PartitionSnapshotStorageFactoryTest extends 
BaseIgniteAbstractTest {
             mock(CatalogService.class),
             mock(FailureProcessor.class),
             mock(Executor.class),
-            mock(LogStorageAccess.class)
+            mock(LogStorageAccess.class),
+            new RaftSnapshotsMetricsSource()
     );
 
     @Mock
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
index 014b76f1375..7f93c183908 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -60,7 +61,8 @@ public class PartitionSnapshotStorageTest extends 
BaseIgniteAbstractTest {
             mock(CatalogService.class),
             mock(FailureProcessor.class),
             mock(Executor.class),
-            mock(LogStorageAccess.class)
+            mock(LogStorageAccess.class),
+            new RaftSnapshotsMetricsSource()
     );
 
     @Test
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 00aa96c7ddb..6834e60daa6 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.StreamSupport.stream;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
 import static 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
@@ -30,8 +31,10 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,16 +77,22 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkRequest;
+import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
 import 
org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
+import org.apache.ignite.internal.metrics.Metric;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
+import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
 import 
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
@@ -92,6 +101,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSn
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
@@ -386,6 +396,15 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
             MvTableStorage incomingTableStorage,
             TxStateStorage incomingTxStateStorage,
             MessagingService messagingService
+    ) {
+        return createPartitionSnapshotStorage(incomingTableStorage, 
incomingTxStateStorage, messagingService, catalogService);
+    }
+
+    private PartitionSnapshotStorage createPartitionSnapshotStorage(
+            MvTableStorage incomingTableStorage,
+            TxStateStorage incomingTxStateStorage,
+            MessagingService messagingService,
+            CatalogService catalogService
     ) {
         TopologyService topologyService = mock(TopologyService.class);
 
@@ -404,7 +423,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 mock(FailureProcessor.class),
                 executorService,
                 0,
-                new LogStorageAccessImpl(replicaManager)
+                new LogStorageAccessImpl(replicaManager),
+                new RaftSnapshotsMetricsSource()
         );
 
         storage.addMvPartition(TABLE_ID, spy(new PartitionMvStorageAccessImpl(
@@ -740,7 +760,8 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
                 partitionSnapshotStorage,
                 SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId, 
NODE_NAME)),
                 mock(Executor.class),
-                0
+                0,
+                new RaftSnapshotsMetricsSource()
         );
 
         Thread anotherThread = new Thread(copier::cancel);
@@ -817,6 +838,90 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
         assertThatTargetStoragesAreEmpty(incomingMvTableStorage, 
incomingTxStateStorage);
     }
 
+    @Test
+    void metricsCalculateCorrectly() throws InterruptedException {
+        incomingMvTableStorage.createMvPartition(PARTITION_ID);
+        incomingTxStateStorage.getOrCreatePartitionStorage(PARTITION_ID);
+
+        PartitionSnapshotMeta meta = mock(PartitionSnapshotMeta.class);
+
+        when(meta.requiredCatalogVersion()).thenReturn(1);
+
+        SnapshotMetaResponse metaResponse = mock(SnapshotMetaResponse.class);
+
+        when(metaResponse.meta()).thenReturn(meta);
+
+        CompletableFuture<NetworkMessage> loadSnapshotMetaFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<NetworkMessage> loadMvDataFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<NetworkMessage> loadTxMetaFuture = new 
CompletableFuture<>();
+
+        SnapshotMvDataResponse mvDataResponse = 
mock(SnapshotMvDataResponse.class);
+        when(mvDataResponse.finish()).thenReturn(true);
+
+        SnapshotTxDataResponse txMetaResponse = 
mock(SnapshotTxDataResponse.class);
+        when(txMetaResponse.finish()).thenReturn(true);
+
+        MessagingService messagingService = 
messagingServiceForMetrics(loadSnapshotMetaFuture, loadMvDataFuture, 
loadTxMetaFuture);
+
+        CompletableFuture<Void> catalogReadyFuture = new CompletableFuture<>();
+
+        CatalogService catalogService = mock(CatalogService.class);
+
+        
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(catalogReadyFuture);
+
+        PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
+                incomingMvTableStorage,
+                incomingTxStateStorage,
+                messagingService,
+                catalogService
+        );
+
+        var snapshotMetricSource = new RaftSnapshotsMetricsSource();
+
+        snapshotMetricSource.enable();
+
+        IncomingSnapshotCopier copier = new IncomingSnapshotCopier(
+                partitionSnapshotStorage,
+                SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId, 
NODE_NAME)),
+                executorService,
+                1000,
+                snapshotMetricSource
+        );
+
+        waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "0");
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingMeta", "0");
+
+        copier.start();
+
+        waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "1");
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingMeta", "1");
+
+        loadSnapshotMetaFuture.complete(metaResponse);
+
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingMeta", "0");
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsWaitingCatalog", "1");
+
+        catalogReadyFuture.complete(null);
+
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsWaitingCatalog", "0");
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingMvData", "1");
+
+        loadMvDataFuture.complete(mvDataResponse);
+
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingMvData", "0");
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingTxMeta", "1");
+
+        loadTxMetaFuture.complete(txMetaResponse);
+
+        waitTillMetricHasValue(snapshotMetricSource, 
"IncomingSnapshotsLoadingTxMeta", "0");
+
+        copier.join();
+
+        waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "0");
+    }
+
     private static void assertThatTargetStoragesAreEmpty(
             MvTableStorage incomingMvTableStorage,
             TxStateStorage incomingTxStateStorage
@@ -840,4 +945,46 @@ public class IncomingSnapshotCopierTest extends 
BaseIgniteAbstractTest {
     private static UUID generateTxId() {
         return TransactionIds.transactionId(CLOCK.now(), 1);
     }
+
+    private static Metric 
retrieveOutgoingSnapshotMetric(RaftSnapshotsMetricsSource 
snapshotsMetricsSource, String metricName) {
+        return stream(snapshotsMetricsSource.holder().metrics().spliterator(), 
false)
+                .filter(metric -> metricName.equals(metric.name()))
+                .findAny()
+                .get();
+    }
+
+    private static void waitTillMetricHasValue(
+            RaftSnapshotsMetricsSource snapshotsMetricsSource,
+            String metricName,
+            String expectedValue
+    ) {
+        Metric metric = retrieveOutgoingSnapshotMetric(snapshotsMetricsSource, 
metricName);
+
+        await().until(metric::getValueAsString, is(expectedValue));
+    }
+
+    private MessagingService messagingServiceForMetrics(
+            CompletableFuture<NetworkMessage> loadSnapshotMetaFuture,
+            CompletableFuture<NetworkMessage> loadMvDataFuture,
+            CompletableFuture<NetworkMessage> loadTxMetaFuture
+    ) {
+        MessagingService messagingService = mock(MessagingService.class);
+
+        GetLowWatermarkResponse getLowWatermarkResponse = 
mock(GetLowWatermarkResponse.class);
+        
when(getLowWatermarkResponse.lowWatermark()).thenReturn(HybridTimestamp.NULL_HYBRID_TIMESTAMP);
+
+        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMetaRequest.class), anyLong()))
+                .thenReturn(loadSnapshotMetaFuture);
+
+        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotMvDataRequest.class), anyLong()))
+                .thenReturn(loadMvDataFuture);
+
+        when(messagingService.invoke(eq(clusterNode), 
any(SnapshotTxDataRequest.class), anyLong()))
+                .thenReturn(loadTxMetaFuture);
+
+        when(messagingService.invoke(eq(clusterNode), 
any(GetLowWatermarkRequest.class), anyLong()))
+                .thenReturn(completedFuture(getLowWatermarkResponse));
+
+        return messagingService;
+    }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
new file mode 100644
index 00000000000..583c69161d8
--- /dev/null
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests metric source name and outgoing snapshot metric names.
+ * If you want to change the name, or add a new metric, please remember to 
update the corresponding documentation.
+ */
+class RaftSnapshotsMetricsSourceTest {
+    @Test
+    void snapshotMetricsSourceName() {
+        var metricSource = new RaftSnapshotsMetricsSource();
+
+        assertThat(metricSource.name(), is("raft.snapshots"));
+    }
+
+    @Test
+    void testMetricNames() {
+        var metricSource = new RaftSnapshotsMetricsSource();
+
+        MetricSet set = metricSource.enable();
+
+        assertThat(set, is(notNullValue()));
+
+        Set<String> expectedMetrics = Set.of(
+                "IncomingSnapshotsLoadingMeta",
+                "IncomingSnapshotsLoadingTxMeta",
+                "IncomingSnapshotsPreparingIndexForBuild",
+                "IncomingSnapshots",
+                "IncomingSnapshotsLoadingMvData",
+                "IncomingSnapshotsPreparingStorages",
+                "OutgoingSnapshots",
+                "IncomingSnapshotsWaitingCatalog"
+        );
+
+        var actualMetrics = new HashSet<String>();
+        set.forEach(m -> actualMetrics.add(m.name()));
+
+        assertThat(actualMetrics, is(expectedMetrics));
+    }
+}
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index f03cf6c4d2b..8d39978dd80 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
 
+import static java.util.stream.StreamSupport.stream;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
@@ -32,6 +33,7 @@ import java.util.UUID;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.metrics.Metric;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -39,6 +41,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -72,6 +75,8 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
 
     private static final int REQUIRED_CATALOG_VERSION = 42;
 
+    private RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
     @BeforeEach
     void createTestInstance(
             @Mock Catalog catalog,
@@ -86,12 +91,17 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
         partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
         partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
 
+        UUID snapshotId = UUID.randomUUID();
+
+        snapshotsMetricsSource = new RaftSnapshotsMetricsSource();
+
         snapshot = new OutgoingSnapshot(
-                UUID.randomUUID(),
+                snapshotId,
                 partitionKey,
                 partitionsByTableId,
                 mock(PartitionTxStateAccess.class),
-                catalogService
+                catalogService,
+                snapshotsMetricsSource
         );
     }
 
@@ -179,4 +189,44 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
 
         assertThat(getNullableSnapshotMetaResponse(), is(nullValue()));
     }
+
+    @Test
+    void metricsCalculateCorrectly() {
+        when(partitionAccess1.committedGroupConfiguration()).thenReturn(new 
RaftGroupConfiguration(
+                13L, 37L, 111L, 110L, List.of(), List.of(), null, null
+        ));
+
+        // Given metric source enabled.
+
+        snapshotsMetricsSource.enable();
+
+        Metric metric = retrieveOutgoingSnapshotMetric();
+
+        // Before outgoing snapshot is started TotalOutgoingSnapshots metric 
should return 0.
+
+        assertThat(metric.getValueAsString(), is("0"));
+
+        // After outgoing snapshot is started TotalOutgoingSnapshots metric 
should return 1.
+
+        snapshot.freezeScopeUnderMvLock();
+
+        metric = retrieveOutgoingSnapshotMetric();
+
+        assertThat(metric.getValueAsString(), is("1"));
+
+        // And finally after outgoing snapshot is closed 
TotalOutgoingSnapshots metric should return 0.
+
+        snapshot.close();
+
+        metric = retrieveOutgoingSnapshotMetric();
+
+        assertThat(metric.getValueAsString(), is("0"));
+    }
+
+    private Metric retrieveOutgoingSnapshotMetric() {
+        return stream(snapshotsMetricsSource.holder().metrics().spliterator(), 
false)
+                .filter(metric -> "OutgoingSnapshots".equals(metric.name()))
+                .findAny()
+                .get();
+    }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index d49b02355f5..f857fdabbe4 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -50,6 +50,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowImpl;
@@ -122,12 +123,15 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
         partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
 
+        UUID snapshotId = UUID.randomUUID();
+
         snapshot = new OutgoingSnapshot(
-                UUID.randomUUID(),
+                snapshotId,
                 partitionKey,
                 partitionsByTableId,
                 mock(PartitionTxStateAccess.class),
-                catalogService
+                catalogService,
+                new RaftSnapshotsMetricsSource()
         );
 
         snapshot.acquireMvLock();
@@ -157,12 +161,15 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
     ) {
         
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
 
+        UUID snapshotId = UUID.randomUUID();
+
         snapshot = new OutgoingSnapshot(
-                UUID.randomUUID(),
+                snapshotId,
                 partitionKey,
                 Int2ObjectMaps.emptyMap(),
                 txStateAccess,
-                catalogService
+                catalogService,
+                new RaftSnapshotsMetricsSource()
         );
 
         snapshot.acquireMvLock();
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index b7283ad58e2..f6bb26e7853 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMv
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -78,15 +79,20 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
             return null;
         }).when(outgoingSnapshotsManager).startOutgoingSnapshot(any(), any());
 
+        var partitionKey = new ZonePartitionKey(0, 0);
+
+        var snapshotMetricsSource = new RaftSnapshotsMetricsSource();
+
         var snapshotStorage = new PartitionSnapshotStorage(
-                new ZonePartitionKey(0, 0),
+                partitionKey,
                 mock(TopologyService.class),
                 outgoingSnapshotsManager,
                 txStateAccess,
                 catalogService,
                 mock(FailureProcessor.class),
                 mock(Executor.class),
-                mock(LogStorageAccess.class)
+                mock(LogStorageAccess.class),
+                snapshotMetricsSource
         );
 
         snapshotStorage.addMvPartition(TABLE_ID_1, partitionAccess1);
@@ -100,7 +106,9 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
         lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
         lenient().when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
 
-        try (var reader = new OutgoingSnapshotReader(UUID.randomUUID(), 
snapshotStorage)) {
+        UUID snapshotId = UUID.randomUUID();
+
+        try (var reader = new OutgoingSnapshotReader(snapshotId, 
snapshotStorage, snapshotMetricsSource)) {
             SnapshotMeta meta = reader.load();
             assertEquals(10L, meta.lastIncludedIndex());
             assertEquals(1L, meta.lastIncludedTerm());
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index ed2329e09f5..212f31bbef8 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
@@ -107,12 +108,15 @@ class OutgoingSnapshotTxDataStreamingTest extends 
BaseIgniteAbstractTest {
 
         
lenient().when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
 
+        UUID snapshotId = UUID.randomUUID();
+
         snapshot = new OutgoingSnapshot(
-                UUID.randomUUID(),
+                snapshotId,
                 partitionKey,
                 singleton(1, partitionAccess),
                 txAccess,
-                catalogService
+                catalogService,
+                new RaftSnapshotsMetricsSource()
         );
     }
 
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
index 8bafdceaf0e..a4dbc22532d 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -86,12 +87,15 @@ class OutgoingSnapshotsManagerTest extends 
BaseIgniteAbstractTest {
 
         when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
 
+        UUID snapshotId = UUID.randomUUID();
+
         OutgoingSnapshot snapshot = new OutgoingSnapshot(
-                UUID.randomUUID(),
+                snapshotId,
                 partitionKey,
                 singleton(TABLE_ID, partitionAccess),
                 mock(PartitionTxStateAccess.class),
-                catalogService
+                catalogService,
+                new RaftSnapshotsMetricsSource()
         );
 
         assertDoesNotThrow(() -> 
manager.startOutgoingSnapshot(UUID.randomUUID(), snapshot));
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 34d6d674812..f4aff6292ca 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -70,7 +70,8 @@ class ItMetricControllerTest extends 
ClusterPerClassIntegrationTest {
             new MetricSource("resource.vacuum", true),
             new MetricSource("placement-driver", true),
             new MetricSource("clock.service", true),
-            new MetricSource("index.builder", true)
+            new MetricSource("index.builder", true),
+            new MetricSource("raft.snapshots", true)
     };
 
     @Inject
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index fb06a696c06..027fe98e9f6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -763,7 +763,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 txManager,
                 schemaManager,
                 dataStorageManager,
-                outgoingSnapshotManager
+                outgoingSnapshotManager,
+                metricManager
         );
 
         TableManager tableManager = new TableManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 959cbe912f9..b2b8c53eeb3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1101,7 +1101,8 @@ public class IgniteImpl implements Ignite {
                 txManager,
                 schemaManager,
                 dataStorageMgr,
-                outgoingSnapshotsManager
+                outgoingSnapshotsManager,
+                metricManager
         );
 
         systemViewManager.register(txManager);
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index bfd49f6aeaf..0c2a49a435c 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -500,7 +500,9 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark, 
metaStorageManager);
 
-        dsm = createDataStorageManager();
+        var metricManager = new NoOpMetricManager();
+
+        dsm = createDataStorageManager(metricManager);
 
         AlwaysSyncedSchemaSyncService schemaSyncService = new 
AlwaysSyncedSchemaSyncService();
 
@@ -543,7 +545,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 txManager,
                 sm,
                 dsm,
-                outgoingSnapshotManager
+                outgoingSnapshotManager,
+                metricManager
         ));
 
         tableManager = new TableManager(
@@ -583,7 +586,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 new SystemPropertiesNodeProperties(),
                 minTimeCollectorService,
                 systemDistributedConfiguration,
-                new NoOpMetricManager(),
+                metricManager,
                 TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
         ) {
 
@@ -648,7 +651,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         );
     }
 
-    private DataStorageManager createDataStorageManager() {
+    private DataStorageManager createDataStorageManager(MetricManager 
metricManager) {
         ConfigurationRegistry mockedRegistry = 
mock(ConfigurationRegistry.class);
 
         
when(mockedRegistry.getConfiguration(NodeConfiguration.KEY)).thenReturn(nodeConfiguration);
@@ -658,7 +661,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         DataStorageManager manager = new DataStorageManager(
                 dataStorageModules.createStorageEngines(
                         NODE_NAME,
-                        mock(MetricManager.class),
+                        metricManager,
                         mockedRegistry,
                         workDir,
                         null,

Reply via email to