This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new aab140ff2aa IGNITE-27150 Add raft snapshot metrics (#7269)
aab140ff2aa is described below
commit aab140ff2aa31083150fbb6cdeed79bc22069639
Author: Ivan Zlenko <[email protected]>
AuthorDate: Tue Dec 23 11:41:29 2025 +0500
IGNITE-27150 Add raft snapshot metrics (#7269)
---
.../administrators-guide/metrics/metrics-list.adoc | 18 ++
.../ignite/internal/cli/CliIntegrationTest.java | 3 +-
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
modules/partition-replicator/build.gradle | 3 +
.../partition/replicator/fixtures/Node.java | 3 +-
.../ItZonePartitionRaftListenerRecoveryTest.java | 4 +-
.../PartitionReplicaLifecycleManager.java | 19 +-
.../partition/replicator/ZoneResourcesManager.java | 10 +-
.../raft/snapshot/PartitionSnapshotStorage.java | 25 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 23 +-
.../metrics/RaftSnapshotsMetricsSource.java | 280 +++++++++++++++++++++
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 9 +-
.../snapshot/outgoing/OutgoingSnapshotReader.java | 10 +-
.../outgoing/OutgoingSnapshotsManager.java | 2 +
.../PartitionReplicaLifecycleManagerTest.java | 4 +-
.../PartitionSnapshotStorageFactoryTest.java | 4 +-
.../snapshot/PartitionSnapshotStorageTest.java | 4 +-
.../incoming/IncomingSnapshotCopierTest.java | 151 ++++++++++-
.../metrics/RaftSnapshotsMetricsSourceTest.java | 65 +++++
.../outgoing/OutgoingSnapshotCommonTest.java | 54 +++-
.../OutgoingSnapshotMvDataStreamingTest.java | 15 +-
.../outgoing/OutgoingSnapshotReaderTest.java | 14 +-
.../OutgoingSnapshotTxDataStreamingTest.java | 8 +-
.../outgoing/OutgoingSnapshotsManagerTest.java | 8 +-
.../rest/metrics/ItMetricControllerTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../distributed/TableManagerRecoveryTest.java | 13 +-
28 files changed, 718 insertions(+), 43 deletions(-)
diff --git a/docs/_docs/administrators-guide/metrics/metrics-list.adoc
b/docs/_docs/administrators-guide/metrics/metrics-list.adoc
index abe2bbe1767..ffacf503684 100644
--- a/docs/_docs/administrators-guide/metrics/metrics-list.adoc
+++ b/docs/_docs/administrators-guide/metrics/metrics-list.adoc
@@ -296,3 +296,21 @@ Transaction metrics.
| LocalUnrebalancedPartitionsCount | The number of partitions that should be
moved to this node.
| TotalUnrebalancedPartitionsCount | The total number of partitions that
should be moved to a new owner.
|=======================================================================
+
+== raft.snapshots
+
+Metrics related to Raft snapshots of partition replicas.
+
+[width="100%",cols="20%,80%",opts="header"]
+|=======================================================================
+| Metric name | Description
+
+| IncomingSnapshots | The number of incoming Raft snapshots in progress.
+| IncomingSnapshotsLoadingMeta | The number of incoming Raft snapshots loading
metadata.
+| IncomingSnapshotsWaitingCatalog | The number of incoming Raft snapshots
waiting for catalog.
+| IncomingSnapshotsPreparingStorages | The number of incoming Raft snapshots
preparing storages.
+| IncomingSnapshotsPreparingIndexForBuild | The number of incoming Raft
snapshots preparing indexes for build.
+| IncomingSnapshotsLoadingMvData | The number of incoming Raft snapshots
loading multi-versioned data.
+| IncomingSnapshotsLoadingTxMeta | The number of incoming Raft snapshots
loading transaction metadata.
+| OutgoingSnapshots | The number of outgoing Raft snapshots in progress.
+|=======================================================================
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 7f246c67b79..7fb227df5e3 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -82,7 +82,8 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("placement-driver").enabled(true),
new MetricSource().name("resource.vacuum").enabled(true),
new MetricSource().name("clock.service").enabled(true),
- new MetricSource().name("index.builder").enabled(true)
+ new MetricSource().name("index.builder").enabled(true),
+ new MetricSource().name("raft.snapshots").enabled(true)
};
/** Correct ignite jdbc url. */
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 860617f9185..06feac04799 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1556,7 +1556,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
txManager,
schemaManager,
dataStorageMgr,
- outgoingSnapshotManager
+ outgoingSnapshotManager,
+ metricManager
);
tableManager = new TableManager(
diff --git a/modules/partition-replicator/build.gradle
b/modules/partition-replicator/build.gradle
index 896a6cbe843..8398351ed41 100644
--- a/modules/partition-replicator/build.gradle
+++ b/modules/partition-replicator/build.gradle
@@ -52,6 +52,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-core'))
testImplementation testFixtures(project(':ignite-low-watermark'))
testImplementation testFixtures(project(':ignite-metastorage'))
+ testImplementation testFixtures(project(':ignite-metrics'))
testImplementation testFixtures(project(':ignite-schema'))
testImplementation testFixtures(project(':ignite-storage-api'))
testImplementation testFixtures(project(':ignite-table'))
@@ -60,6 +61,8 @@ dependencies {
testImplementation testFixtures(project(':ignite-raft'))
testImplementation testFixtures(project(':ignite-distribution-zones'))
+ testImplementation libs.awaitility
+
integrationTestImplementation
testFixtures(project(':ignite-cluster-management'))
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index d18eae051cd..d2596ffc661 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -744,7 +744,8 @@ public class Node {
txManager,
schemaManager,
dataStorageMgr,
- outgoingSnapshotsManager
+ outgoingSnapshotsManager,
+ metricManager
);
resourceVacuumManager = new ResourceVacuumManager(
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
index 5c7621af9e5..1aa7266a88d 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/raft/ItZonePartitionRaftListenerRecoveryTest.java
@@ -83,6 +83,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSn
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorageFactory;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.placementdriver.LeasePlacementDriver;
import org.apache.ignite.internal.raft.Loza;
@@ -311,7 +312,8 @@ class ItZonePartitionRaftListenerRecoveryTest extends
IgniteAbstractTest {
catalogService,
failureProcessor,
executor,
- new LogStorageAccessImpl(replicaManager)
+ new LogStorageAccessImpl(replicaManager),
+ new RaftSnapshotsMetricsSource()
);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index a423ffea41e..d96825663c8 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -127,6 +127,7 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyService;
@@ -265,6 +266,8 @@ public class PartitionReplicaLifecycleManager extends
private final ZoneResourcesManager zoneResourcesManager;
+ private final MetricManager metricManager;
+
private final ReliableCatalogVersions reliableCatalogVersions;
private final EventListener<CreateZoneEventParameters>
onCreateZoneListener = this::onCreateZone;
@@ -295,6 +298,7 @@ public class PartitionReplicaLifecycleManager extends
* @param schemaManager Schema manager.
* @param dataStorageManager Data storage manager.
* @param outgoingSnapshotsManager Outgoing snapshots manager.
+ * @param metricManager Metric manager.
*/
public PartitionReplicaLifecycleManager(
CatalogService catalogService,
@@ -316,7 +320,8 @@ public class PartitionReplicaLifecycleManager extends
TxManager txManager,
SchemaManager schemaManager,
DataStorageManager dataStorageManager,
- OutgoingSnapshotsManager outgoingSnapshotsManager
+ OutgoingSnapshotsManager outgoingSnapshotsManager,
+ MetricManager metricManager
) {
this(
catalogService,
@@ -346,7 +351,8 @@ public class PartitionReplicaLifecycleManager extends
failureProcessor,
partitionOperationsExecutor,
replicaMgr
- )
+ ),
+ metricManager
);
}
@@ -370,7 +376,8 @@ public class PartitionReplicaLifecycleManager extends
TxManager txManager,
SchemaManager schemaManager,
DataStorageManager dataStorageManager,
- ZoneResourcesManager zoneResourcesManager
+ ZoneResourcesManager zoneResourcesManager,
+ MetricManager metricManager
) {
this.catalogService = catalogService;
this.replicaMgr = replicaMgr;
@@ -390,6 +397,7 @@ public class PartitionReplicaLifecycleManager extends
this.schemaManager = schemaManager;
this.dataStorageManager = dataStorageManager;
this.zoneResourcesManager = zoneResourcesManager;
+ this.metricManager = metricManager;
rebalanceRetryDelayConfiguration = new
SystemDistributedConfigurationPropertyHolder<>(
systemDistributedConfiguration,
@@ -431,6 +439,9 @@ public class PartitionReplicaLifecycleManager extends
executorInclinedPlacementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
onPrimaryReplicaExpiredListener);
+
metricManager.registerSource(zoneResourcesManager.snapshotsMetricsSource());
+ metricManager.enable(zoneResourcesManager.snapshotsMetricsSource());
+
return processZonesAndAssignmentsOnStart;
}
@@ -1657,6 +1668,8 @@ public class PartitionReplicaLifecycleManager extends
}
try {
+
metricManager.unregisterSource(zoneResourcesManager.snapshotsMetricsSource());
+
IgniteUtils.closeAllManually(zoneResourcesManager);
} catch (Exception e) {
return failedFuture(e);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index 51c8efe2cea..969573e705e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageA
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ZonePartitionId;
@@ -70,6 +71,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
private final ReplicaManager replicaManager;
+ private final RaftSnapshotsMetricsSource snapshotsMetricsSource = new
RaftSnapshotsMetricsSource();
+
/** Map from zone IDs to their resource holders. */
private final Map<Integer, ZoneResources> resourcesByZoneId = new
ConcurrentHashMap<>();
@@ -128,7 +131,8 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
catalogService,
failureProcessor,
partitionOperationsExecutor,
- new LogStorageAccessImpl(replicaManager)
+ new LogStorageAccessImpl(replicaManager),
+ snapshotsMetricsSource
);
var zonePartitionResources = new ZonePartitionResources(
@@ -218,6 +222,10 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
return resources.txStateStorage.getPartitionStorage(partitionId);
}
+ RaftSnapshotsMetricsSource snapshotsMetricsSource() {
+ return snapshotsMetricsSource;
+ }
+
private static class ZoneResources {
final TxStateStorage txStateStorage;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index 6cf34dde4a1..705c598a1ef 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotReader;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -100,6 +101,8 @@ public class PartitionSnapshotStorage {
private final LogStorageAccess logStorage;
+ private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
/** Constructor. */
public PartitionSnapshotStorage(
ZonePartitionKey partitionKey,
@@ -109,7 +112,8 @@ public class PartitionSnapshotStorage {
CatalogService catalogService,
FailureProcessor failureProcessor,
Executor incomingSnapshotsExecutor,
- LogStorageAccess logStorage
+ LogStorageAccess logStorage,
+ RaftSnapshotsMetricsSource snapshotsMetricsSource
) {
this(
partitionKey,
@@ -120,7 +124,8 @@ public class PartitionSnapshotStorage {
failureProcessor,
incomingSnapshotsExecutor,
DEFAULT_WAIT_FOR_METADATA_CATCHUP_MS,
- logStorage
+ logStorage,
+ snapshotsMetricsSource
);
}
@@ -134,7 +139,8 @@ public class PartitionSnapshotStorage {
FailureProcessor failureProcessor,
Executor incomingSnapshotsExecutor,
long waitForMetadataCatchupMs,
- LogStorageAccess logStorage
+ LogStorageAccess logStorage,
+ RaftSnapshotsMetricsSource snapshotsMetricsSource
) {
this.partitionKey = partitionKey;
this.topologyService = topologyService;
@@ -145,6 +151,7 @@ public class PartitionSnapshotStorage {
this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
this.logStorage = logStorage;
+ this.snapshotsMetricsSource = snapshotsMetricsSource;
}
public ZonePartitionKey partitionKey() {
@@ -231,7 +238,13 @@ public class PartitionSnapshotStorage {
SnapshotUri snapshotUri = SnapshotUri.fromStringUri(uri);
- var copier = new IncomingSnapshotCopier(this, snapshotUri,
incomingSnapshotsExecutor, waitForMetadataCatchupMs) {
+ var copier = new IncomingSnapshotCopier(
+ this,
+ snapshotUri,
+ incomingSnapshotsExecutor,
+ waitForMetadataCatchupMs,
+ snapshotsMetricsSource
+ ) {
@Override
public void close() {
try {
@@ -257,7 +270,7 @@ public class PartitionSnapshotStorage {
startSnapshotOperation(snapshotId);
- return new OutgoingSnapshotReader(snapshotId, this) {
+ return new OutgoingSnapshotReader(snapshotId, this,
snapshotsMetricsSource) {
@Override
public void close() throws IOException {
try {
@@ -282,6 +295,8 @@ public class PartitionSnapshotStorage {
LOG.info("Finishing outgoing snapshot [partitionKey={},
snapshotId={}]", partitionKey, snapshotId);
synchronized (snapshotOperationLock) {
+ LOG.info("Finishing outgoing snapshot [partitionKey={},
snapshotId={}]", partitionKey, snapshotId);
+
CompletableFuture<Void> operationFuture =
ongoingSnapshotOperations.remove(snapshotId);
assert operationFuture != null :
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 50bc86dbc4f..f9ffb208f04 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -74,6 +74,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMv
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationSerializer;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -137,6 +138,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private final IncomingSnapshotStats snapshotStats = new
IncomingSnapshotStats();
+ private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
/**
* Constructor.
*
@@ -145,23 +148,27 @@ public class IncomingSnapshotCopier extends
SnapshotCopier {
* @param executor Thread pool for IO operations.
* @param waitForMetadataCatchupMs How much time to allow for metadata on
this node to reach the catalog version required by an
* incoming snapshot.
+ * @param snapshotsMetricsSource Raft snapshots metrics source.
*/
public IncomingSnapshotCopier(
PartitionSnapshotStorage partitionSnapshotStorage,
SnapshotUri snapshotUri,
Executor executor,
- long waitForMetadataCatchupMs
+ long waitForMetadataCatchupMs,
+ RaftSnapshotsMetricsSource snapshotsMetricsSource
) {
this.partitionSnapshotStorage = partitionSnapshotStorage;
this.snapshotUri = snapshotUri;
this.executor = executor;
this.throttledLogger = Loggers.toThrottledLogger(LOG, executor);
this.waitForMetadataCatchupMs = waitForMetadataCatchupMs;
+ this.snapshotsMetricsSource = snapshotsMetricsSource;
}
@Override
public void start() {
snapshotStats.onSnapshotInstallationStart();
+ snapshotsMetricsSource.onSnapshotInstallationStart();
if (LOG.isInfoEnabled()) {
LOG.info("Rebalance is started [snapshotId={}, {}]",
snapshotUri.snapshotId, createPartitionInfo());
@@ -226,6 +233,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private CompletableFuture<?>
waitForMetadataWithTimeout(PartitionSnapshotMeta snapshotMeta) {
snapshotStats.onWaitingCatalogPhaseStart();
+ snapshotsMetricsSource.onWaitingCatalogPhaseStart();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -243,6 +251,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
return anyOf(metadataReadyFuture, readinessTimeoutFuture)
.whenComplete((ignored, throwable) -> {
snapshotStats.onWaitingCatalogPhaseEnd();
+ snapshotsMetricsSource.onWaitingCatalogPhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -351,6 +360,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onLoadSnapshotPhaseStart();
+ snapshotsMetricsSource.onLoadSnapshotMetaPhaseStart();
if (LOG.isInfoEnabled()) {
LOG.info("Start loading snapshot meta [snapshotId={}, {}]",
snapshotUri.snapshotId, createPartitionInfo());
@@ -365,6 +375,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
PartitionSnapshotMeta snapshotMeta = ((SnapshotMetaResponse)
response).meta();
snapshotStats.onLoadSnapshotPhaseEnd();
+ snapshotsMetricsSource.onLoadSnapshotMetaPhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -414,6 +425,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onLoadMvDataPhaseStart();
+ snapshotsMetricsSource.onLoadMvDataPhaseStart();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -453,6 +465,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
if (snapshotMvDataResponse.finish()) {
snapshotStats.onLoadMvDataPhaseEnd();
+ snapshotsMetricsSource.onLoadMvDataPhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -496,6 +509,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onLoadTxMetasPhaseStart();
+ snapshotsMetricsSource.onLoadTxMetasPhaseStart();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -537,6 +551,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
if (snapshotTxDataResponse.finish()) {
snapshotStats.onLoadTxMetasPhaseEnd();
+ snapshotsMetricsSource.onLoadTxMetasPhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -578,7 +593,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
* successful.
*/
private CompletableFuture<Void> completeRebalance(SnapshotContext
snapshotContext, @Nullable Throwable throwable) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-27428
snapshotStats.onSnapshotInstallationEnd();
+ snapshotsMetricsSource.onSnapshotInstallationEnd();
if (!busyLock.enterBusy()) {
if (isOk()) {
@@ -730,6 +747,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onSetRowIdToBuildPhaseStart();
+ snapshotsMetricsSource.onSetRowIdToBuildPhaseStart();
try {
Map<Integer, UUID> nextRowUuidToBuildByIndexId =
snapshotContext.meta.nextRowIdToBuildByIndexId();
@@ -772,6 +790,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onSetRowIdToBuildPhaseEnd();
+ snapshotsMetricsSource.onSetRowIdToBuildPhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info("Finished setting next row ID for index building
[snapshotId={}, {}, totalTime={}ms]",
@@ -817,6 +836,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
}
snapshotStats.onPreparingStoragePhaseStart();
+ snapshotsMetricsSource.onPreparingStoragePhaseStart();
if (LOG.isInfoEnabled()) {
LOG.info(
@@ -833,6 +853,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
).thenComposeAsync(unused ->
startRebalanceForReplicationLogStorages(snapshotContext), executor)
.whenComplete((ignore, throwable) -> {
snapshotStats.onPreparingStoragePhaseEnd();
+ snapshotsMetricsSource.onPreparingStoragePhaseEnd();
if (LOG.isInfoEnabled()) {
LOG.info(
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
new file mode 100644
index 00000000000..db2ee6de014
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntSupplier;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.IntGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource.Holder;
+
+/**
+ * Metric source that exposes counters related to Raft snapshots lifecycle for
partition replicator.
+ *
+ * <p>The source is registered under the name {@code raft.snapshots}. It
maintains the number of currently
+ * running incoming and outgoing snapshots and per-phase counters for the
installation of incoming snapshots.
+ * These counters are intended to help understand where time is spent during
snapshot installation and
+ * whether there are any bottlenecks (for example, waiting for catalog,
loading multi-versioned data, etc.).
+ */
+public class RaftSnapshotsMetricsSource extends AbstractMetricSource<Holder> {
+ private final AtomicInteger incomingSnapshotsCounter = new AtomicInteger();
+
+ private final AtomicInteger snapshotsLoadingMetaCounter = new
AtomicInteger();
+
+ private final AtomicInteger snapshotsWaitingCatalogCounter = new
AtomicInteger();
+
+ private final AtomicInteger snapshotsPreparingStoragesCounter = new
AtomicInteger();
+
+ private final AtomicInteger snapshotsPreparingIndexForBuildCounter = new
AtomicInteger();
+
+ private final AtomicInteger snapshotsLoadingMvDataCounter = new
AtomicInteger();
+
+ private final AtomicInteger snapshotsLoadingTxMetaCounter = new
AtomicInteger();
+
+ private final AtomicInteger outgoingSnapshotsCounter = new AtomicInteger();
+
+ /**
+ * Creates a new metric source with the name {@code raft.snapshots}.
+ */
+ public RaftSnapshotsMetricsSource() {
+ super("raft.snapshots");
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder(
+ incomingSnapshotsCounter::get,
+ snapshotsLoadingMetaCounter::get,
+ snapshotsWaitingCatalogCounter::get,
+ snapshotsPreparingStoragesCounter::get,
+ snapshotsPreparingIndexForBuildCounter::get,
+ snapshotsLoadingMvDataCounter::get,
+ snapshotsLoadingTxMetaCounter::get,
+ outgoingSnapshotsCounter::get
+ );
+ }
+
+ /**
+ * Marks the start of an incoming snapshot installation.
+ */
+ public void onSnapshotInstallationStart() {
+ incomingSnapshotsCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of an incoming snapshot installation.
+ */
+ public void onSnapshotInstallationEnd() {
+ incomingSnapshotsCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of the "load snapshot metadata" phase during
incoming snapshot installation.
+ */
+ public void onLoadSnapshotMetaPhaseStart() {
+ snapshotsLoadingMetaCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of the "load snapshot metadata" phase during incoming
snapshot installation.
+ */
+ public void onLoadSnapshotMetaPhaseEnd() {
+ snapshotsLoadingMetaCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of the phase where the node waits for catalog to be
ready/apply updates
+ * for the incoming snapshot installation.
+ */
+ public void onWaitingCatalogPhaseStart() {
+ snapshotsWaitingCatalogCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of the "waiting for catalog" phase during incoming
snapshot installation.
+ */
+ public void onWaitingCatalogPhaseEnd() {
+ snapshotsWaitingCatalogCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of the storage preparation phase for incoming
snapshot installation.
+ */
+ public void onPreparingStoragePhaseStart() {
+ snapshotsPreparingStoragesCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of the storage preparation phase for incoming snapshot
installation.
+ */
+ public void onPreparingStoragePhaseEnd() {
+ snapshotsPreparingStoragesCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of the phase where MV (multi-version) data is
loaded.
+ */
+ public void onLoadMvDataPhaseStart() {
+ snapshotsLoadingMvDataCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of the phase where MV (multi-version) data is loaded.
+ */
+ public void onLoadMvDataPhaseEnd() {
+ snapshotsLoadingMvDataCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of the phase where transaction metadata is loaded
from the snapshot.
+ */
+ public void onLoadTxMetasPhaseStart() {
+ snapshotsLoadingTxMetaCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of the phase where transaction metadata is loaded from
the snapshot.
+ */
+ public void onLoadTxMetasPhaseEnd() {
+ snapshotsLoadingTxMetaCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the beginning of preparing indexes for build as part of incoming
snapshot installation.
+ */
+ public void onSetRowIdToBuildPhaseStart() {
+ snapshotsPreparingIndexForBuildCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of preparing indexes for build as part of incoming
snapshot installation.
+ */
+ public void onSetRowIdToBuildPhaseEnd() {
+ snapshotsPreparingIndexForBuildCounter.decrementAndGet();
+ }
+
+ /**
+ * Marks the start of an outgoing snapshot creation/streaming.
+ */
+ public void onOutgoingSnapshotStart() {
+ outgoingSnapshotsCounter.incrementAndGet();
+ }
+
+ /**
+ * Marks the end of an outgoing snapshot creation/streaming.
+ */
+ public void onOutgoingSnapshotEnd() {
+ outgoingSnapshotsCounter.decrementAndGet();
+ }
+
+ /**
+ * Container of metrics exposed by {@link RaftSnapshotsMetricsSource}.
+ */
+ public static class Holder implements AbstractMetricSource.Holder<Holder> {
+ private final IntGauge incomingSnapshots;
+
+ private final IntGauge snapshotsLoadingMeta;
+
+ private final IntGauge snapshotsWaitingCatalog;
+
+ private final IntGauge snapshotsPreparingStorages;
+
+ private final IntGauge snapshotsPreparingIndexForBuild;
+
+ private final IntGauge snapshotsLoadingMvData;
+
+ private final IntGauge snapshotsLoadingTxMeta;
+
+ private final IntGauge outgoingSnapshots;
+
+ private Holder(
+ IntSupplier incomingSnapshotsSupplier,
+ IntSupplier snapshotsLoadingMetaSupplier,
+ IntSupplier snapshotsWaitingCatalogSupplier,
+ IntSupplier snapshotsPreparingStoragesSupplier,
+ IntSupplier snapshotsPreparingIndexForBuildSupplier,
+ IntSupplier snapshotsLoadingMvDataSupplier,
+ IntSupplier snapshotsLoadingTxMetaSupplier,
+ IntSupplier outgoingSnapshotsSupplier
+ ) {
+ incomingSnapshots = new IntGauge(
+ "IncomingSnapshots",
+ "Incoming Raft snapshots in progress",
+ incomingSnapshotsSupplier
+ );
+
+ snapshotsLoadingMeta = new IntGauge(
+ "IncomingSnapshotsLoadingMeta",
+ "Incoming Raft snapshots loading metadata",
+ snapshotsLoadingMetaSupplier
+ );
+
+ snapshotsWaitingCatalog = new IntGauge(
+ "IncomingSnapshotsWaitingCatalog",
+ "Incoming Raft snapshots waiting for catalog",
+ snapshotsWaitingCatalogSupplier
+ );
+
+ snapshotsPreparingStorages = new IntGauge(
+ "IncomingSnapshotsPreparingStorages",
+ "Incoming Raft snapshots preparing storages",
+ snapshotsPreparingStoragesSupplier
+ );
+
+ snapshotsPreparingIndexForBuild = new IntGauge(
+ "IncomingSnapshotsPreparingIndexForBuild",
+ "Incoming Raft snapshots preparing indexes for build",
+ snapshotsPreparingIndexForBuildSupplier
+ );
+
+ snapshotsLoadingMvData = new IntGauge(
+ "IncomingSnapshotsLoadingMvData",
+ "Incoming Raft snapshots loading multi-versioned data",
+ snapshotsLoadingMvDataSupplier
+ );
+
+ snapshotsLoadingTxMeta = new IntGauge(
+ "IncomingSnapshotsLoadingTxMeta",
+ "Incoming Raft snapshots loading transaction metadata",
+ snapshotsLoadingTxMetaSupplier
+ );
+
+ outgoingSnapshots = new IntGauge(
+ "OutgoingSnapshots",
+ "Outgoing Raft snapshots in progress",
+ outgoingSnapshotsSupplier
+ );
+ }
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return List.of(
+ incomingSnapshots,
+ snapshotsLoadingMeta,
+ snapshotsWaitingCatalog,
+ snapshotsPreparingStorages,
+ snapshotsPreparingIndexForBuild,
+ snapshotsLoadingMvData,
+ snapshotsLoadingTxMeta,
+ outgoingSnapshots
+ );
+ }
+ }
+}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index 026c6066822..2b82930b213 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -145,6 +146,8 @@ public class OutgoingSnapshot {
private final OutgoingSnapshotStats snapshotStats;
+ private final RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
/**
* Creates a new instance.
*/
@@ -153,7 +156,8 @@ public class OutgoingSnapshot {
PartitionKey partitionKey,
Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId,
PartitionTxStateAccess txState,
- CatalogService catalogService
+ CatalogService catalogService,
+ RaftSnapshotsMetricsSource snapshotMetricsSource
) {
this.id = id;
this.partitionKey = partitionKey;
@@ -161,6 +165,7 @@ public class OutgoingSnapshot {
this.txState = txState;
this.catalogService = catalogService;
this.snapshotStats = new OutgoingSnapshotStats(id, partitionKey);
+ this.snapshotsMetricsSource = snapshotMetricsSource;
}
/**
@@ -187,6 +192,7 @@ public class OutgoingSnapshot {
try {
snapshotStats.onSnapshotStart();
+ snapshotsMetricsSource.onOutgoingSnapshotStart();
int catalogVersion = catalogService.latestCatalogVersion();
@@ -662,6 +668,7 @@ public class OutgoingSnapshot {
busyLock.block();
snapshotStats.onSnapshotEnd();
+ snapshotsMetricsSource.onOutgoingSnapshotEnd();
snapshotStats.logSnapshotStats();
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
index 79b30bb0130..bee32530b0a 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReader.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
@@ -45,7 +46,11 @@ public class OutgoingSnapshotReader extends SnapshotReader {
/**
* Constructor.
*/
- public OutgoingSnapshotReader(UUID snapshotId, PartitionSnapshotStorage
snapshotStorage) {
+ public OutgoingSnapshotReader(
+ UUID snapshotId,
+ PartitionSnapshotStorage snapshotStorage,
+ RaftSnapshotsMetricsSource snapshotMetricsSource
+ ) {
this.snapshotStorage = snapshotStorage;
id = snapshotId;
@@ -55,7 +60,8 @@ public class OutgoingSnapshotReader extends SnapshotReader {
snapshotStorage.partitionKey(),
snapshotStorage.partitionsByTableId(),
snapshotStorage.txState(),
- snapshotStorage.catalogService()
+ snapshotStorage.catalogService(),
+ snapshotMetricsSource
);
LOG.info("Starting snapshot reader [{}, snapshotId={}]",
createPartitionInfo(), id);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index 313bd6a5726..fde77ca77bd 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -133,6 +133,8 @@ public class OutgoingSnapshotsManager implements
PartitionsSnapshots, IgniteComp
* @param outgoingSnapshot Outgoing snapshot.
*/
void startOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot
outgoingSnapshot) {
+ LOG.info("Starting outgoing snapshot [snapshotId={}]", snapshotId);
+
snapshots.put(snapshotId, outgoingSnapshot);
PartitionSnapshotsImpl partitionSnapshots =
getPartitionSnapshots(outgoingSnapshot.partitionKey());
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index d892ee16fda..83b81368122 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.TopologyService;
@@ -282,7 +283,8 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
txManager,
schemaManager,
dataStorageManager,
- zoneResourcesManager
+ zoneResourcesManager,
+ new NoOpMetricManager()
);
var componentContext = new ComponentContext();
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
index 6373adc63af..4a95b5157b9 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.startup.StartupPartitionSnapshotReader;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
@@ -50,7 +51,8 @@ class PartitionSnapshotStorageFactoryTest extends
BaseIgniteAbstractTest {
mock(CatalogService.class),
mock(FailureProcessor.class),
mock(Executor.class),
- mock(LogStorageAccess.class)
+ mock(LogStorageAccess.class),
+ new RaftSnapshotsMetricsSource()
);
@Mock
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
index 014b76f1375..7f93c183908 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.network.TopologyService;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -60,7 +61,8 @@ public class PartitionSnapshotStorageTest extends
BaseIgniteAbstractTest {
mock(CatalogService.class),
mock(FailureProcessor.class),
mock(Executor.class),
- mock(LogStorageAccess.class)
+ mock(LogStorageAccess.class),
+ new RaftSnapshotsMetricsSource()
);
@Test
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 00aa96c7ddb..6834e60daa6 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.StreamSupport.stream;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
@@ -30,8 +31,10 @@ import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -74,16 +77,22 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkRequest;
+import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
import
org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
+import org.apache.ignite.internal.metrics.Metric;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataRequest;
+import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse.ResponseEntry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataRequest;
+import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDataResponse;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfo;
import
org.apache.ignite.internal.partition.replicator.raft.PartitionSnapshotInfoSerializer;
@@ -92,6 +101,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSn
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccessImpl;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.SnapshotUri;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.raft.RaftGroupConfigurationConverter;
@@ -386,6 +396,15 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
MvTableStorage incomingTableStorage,
TxStateStorage incomingTxStateStorage,
MessagingService messagingService
+ ) {
+ return createPartitionSnapshotStorage(incomingTableStorage,
incomingTxStateStorage, messagingService, catalogService);
+ }
+
+ private PartitionSnapshotStorage createPartitionSnapshotStorage(
+ MvTableStorage incomingTableStorage,
+ TxStateStorage incomingTxStateStorage,
+ MessagingService messagingService,
+ CatalogService catalogService
) {
TopologyService topologyService = mock(TopologyService.class);
@@ -404,7 +423,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
mock(FailureProcessor.class),
executorService,
0,
- new LogStorageAccessImpl(replicaManager)
+ new LogStorageAccessImpl(replicaManager),
+ new RaftSnapshotsMetricsSource()
);
storage.addMvPartition(TABLE_ID, spy(new PartitionMvStorageAccessImpl(
@@ -740,7 +760,8 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
partitionSnapshotStorage,
SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId,
NODE_NAME)),
mock(Executor.class),
- 0
+ 0,
+ new RaftSnapshotsMetricsSource()
);
Thread anotherThread = new Thread(copier::cancel);
@@ -817,6 +838,90 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
assertThatTargetStoragesAreEmpty(incomingMvTableStorage,
incomingTxStateStorage);
}
+ @Test
+ void metricsCalculateCorrectly() throws InterruptedException {
+ incomingMvTableStorage.createMvPartition(PARTITION_ID);
+ incomingTxStateStorage.getOrCreatePartitionStorage(PARTITION_ID);
+
+ PartitionSnapshotMeta meta = mock(PartitionSnapshotMeta.class);
+
+ when(meta.requiredCatalogVersion()).thenReturn(1);
+
+ SnapshotMetaResponse metaResponse = mock(SnapshotMetaResponse.class);
+
+ when(metaResponse.meta()).thenReturn(meta);
+
+ CompletableFuture<NetworkMessage> loadSnapshotMetaFuture = new
CompletableFuture<>();
+
+ CompletableFuture<NetworkMessage> loadMvDataFuture = new
CompletableFuture<>();
+
+ CompletableFuture<NetworkMessage> loadTxMetaFuture = new
CompletableFuture<>();
+
+ SnapshotMvDataResponse mvDataResponse =
mock(SnapshotMvDataResponse.class);
+ when(mvDataResponse.finish()).thenReturn(true);
+
+ SnapshotTxDataResponse txMetaResponse =
mock(SnapshotTxDataResponse.class);
+ when(txMetaResponse.finish()).thenReturn(true);
+
+ MessagingService messagingService =
messagingServiceForMetrics(loadSnapshotMetaFuture, loadMvDataFuture,
loadTxMetaFuture);
+
+ CompletableFuture<Void> catalogReadyFuture = new CompletableFuture<>();
+
+ CatalogService catalogService = mock(CatalogService.class);
+
+
when(catalogService.catalogReadyFuture(anyInt())).thenReturn(catalogReadyFuture);
+
+ PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
+ incomingMvTableStorage,
+ incomingTxStateStorage,
+ messagingService,
+ catalogService
+ );
+
+ var snapshotMetricSource = new RaftSnapshotsMetricsSource();
+
+ snapshotMetricSource.enable();
+
+ IncomingSnapshotCopier copier = new IncomingSnapshotCopier(
+ partitionSnapshotStorage,
+ SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId,
NODE_NAME)),
+ executorService,
+ 1000,
+ snapshotMetricSource
+ );
+
+ waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "0");
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingMeta", "0");
+
+ copier.start();
+
+ waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "1");
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingMeta", "1");
+
+ loadSnapshotMetaFuture.complete(metaResponse);
+
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingMeta", "0");
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsWaitingCatalog", "1");
+
+ catalogReadyFuture.complete(null);
+
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsWaitingCatalog", "0");
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingMvData", "1");
+
+ loadMvDataFuture.complete(mvDataResponse);
+
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingMvData", "0");
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingTxMeta", "1");
+
+ loadTxMetaFuture.complete(txMetaResponse);
+
+ waitTillMetricHasValue(snapshotMetricSource,
"IncomingSnapshotsLoadingTxMeta", "0");
+
+ copier.join();
+
+ waitTillMetricHasValue(snapshotMetricSource, "IncomingSnapshots", "0");
+ }
+
private static void assertThatTargetStoragesAreEmpty(
MvTableStorage incomingMvTableStorage,
TxStateStorage incomingTxStateStorage
@@ -840,4 +945,46 @@ public class IncomingSnapshotCopierTest extends
BaseIgniteAbstractTest {
private static UUID generateTxId() {
return TransactionIds.transactionId(CLOCK.now(), 1);
}
+
+ private static Metric
retrieveOutgoingSnapshotMetric(RaftSnapshotsMetricsSource
snapshotsMetricsSource, String metricName) {
+ return stream(snapshotsMetricsSource.holder().metrics().spliterator(),
false)
+ .filter(metric -> metricName.equals(metric.name()))
+ .findAny()
+ .get();
+ }
+
+ private static void waitTillMetricHasValue(
+ RaftSnapshotsMetricsSource snapshotsMetricsSource,
+ String metricName,
+ String expectedValue
+ ) {
+ Metric metric = retrieveOutgoingSnapshotMetric(snapshotsMetricsSource,
metricName);
+
+ await().until(metric::getValueAsString, is(expectedValue));
+ }
+
+ private MessagingService messagingServiceForMetrics(
+ CompletableFuture<NetworkMessage> loadSnapshotMetaFuture,
+ CompletableFuture<NetworkMessage> loadMvDataFuture,
+ CompletableFuture<NetworkMessage> loadTxMetaFuture
+ ) {
+ MessagingService messagingService = mock(MessagingService.class);
+
+ GetLowWatermarkResponse getLowWatermarkResponse =
mock(GetLowWatermarkResponse.class);
+
when(getLowWatermarkResponse.lowWatermark()).thenReturn(HybridTimestamp.NULL_HYBRID_TIMESTAMP);
+
+ when(messagingService.invoke(eq(clusterNode),
any(SnapshotMetaRequest.class), anyLong()))
+ .thenReturn(loadSnapshotMetaFuture);
+
+ when(messagingService.invoke(eq(clusterNode),
any(SnapshotMvDataRequest.class), anyLong()))
+ .thenReturn(loadMvDataFuture);
+
+ when(messagingService.invoke(eq(clusterNode),
any(SnapshotTxDataRequest.class), anyLong()))
+ .thenReturn(loadTxMetaFuture);
+
+ when(messagingService.invoke(eq(clusterNode),
any(GetLowWatermarkRequest.class), anyLong()))
+ .thenReturn(completedFuture(getLowWatermarkResponse));
+
+ return messagingService;
+ }
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
new file mode 100644
index 00000000000..583c69161d8
--- /dev/null
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/metrics/RaftSnapshotsMetricsSourceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests metric source name and outgoing snapshot metric names.
+ * If you want to change the name, or add a new metric, please remember to
update the corresponding documentation.
+ */
+class RaftSnapshotsMetricsSourceTest {
+ @Test
+ void snapshotMetricsSourceName() {
+ var metricSource = new RaftSnapshotsMetricsSource();
+
+ assertThat(metricSource.name(), is("raft.snapshots"));
+ }
+
+ @Test
+ void testMetricNames() {
+ var metricSource = new RaftSnapshotsMetricsSource();
+
+ MetricSet set = metricSource.enable();
+
+ assertThat(set, is(notNullValue()));
+
+ Set<String> expectedMetrics = Set.of(
+ "IncomingSnapshotsLoadingMeta",
+ "IncomingSnapshotsLoadingTxMeta",
+ "IncomingSnapshotsPreparingIndexForBuild",
+ "IncomingSnapshots",
+ "IncomingSnapshotsLoadingMvData",
+ "IncomingSnapshotsPreparingStorages",
+ "OutgoingSnapshots",
+ "IncomingSnapshotsWaitingCatalog"
+ );
+
+ var actualMetrics = new HashSet<String>();
+ set.forEach(m -> actualMetrics.add(m.name()));
+
+ assertThat(actualMetrics, is(expectedMetrics));
+ }
+}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index f03cf6c4d2b..8d39978dd80 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+import static java.util.stream.StreamSupport.stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@@ -32,6 +33,7 @@ import java.util.UUID;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.metrics.Metric;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -39,6 +41,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -72,6 +75,8 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
private static final int REQUIRED_CATALOG_VERSION = 42;
+ private RaftSnapshotsMetricsSource snapshotsMetricsSource;
+
@BeforeEach
void createTestInstance(
@Mock Catalog catalog,
@@ -86,12 +91,17 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
+ UUID snapshotId = UUID.randomUUID();
+
+ snapshotsMetricsSource = new RaftSnapshotsMetricsSource();
+
snapshot = new OutgoingSnapshot(
- UUID.randomUUID(),
+ snapshotId,
partitionKey,
partitionsByTableId,
mock(PartitionTxStateAccess.class),
- catalogService
+ catalogService,
+ snapshotsMetricsSource
);
}
@@ -179,4 +189,44 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
assertThat(getNullableSnapshotMetaResponse(), is(nullValue()));
}
+
+ @Test
+ void metricsCalculateCorrectly() {
+ when(partitionAccess1.committedGroupConfiguration()).thenReturn(new
RaftGroupConfiguration(
+ 13L, 37L, 111L, 110L, List.of(), List.of(), null, null
+ ));
+
+ // Given metric source enabled.
+
+ snapshotsMetricsSource.enable();
+
+ Metric metric = retrieveOutgoingSnapshotMetric();
+
+ // Before outgoing snapshot is started TotalOutgoingSnapshots metric
should return 0.
+
+ assertThat(metric.getValueAsString(), is("0"));
+
+ // After outgoing snapshot is started TotalOutgoingSnapshots metric
should return 1.
+
+ snapshot.freezeScopeUnderMvLock();
+
+ metric = retrieveOutgoingSnapshotMetric();
+
+ assertThat(metric.getValueAsString(), is("1"));
+
+ // And finally after outgoing snapshot is closed
TotalOutgoingSnapshots metric should return 0.
+
+ snapshot.close();
+
+ metric = retrieveOutgoingSnapshotMetric();
+
+ assertThat(metric.getValueAsString(), is("0"));
+ }
+
+ private Metric retrieveOutgoingSnapshotMetric() {
+ return stream(snapshotsMetricsSource.holder().metrics().spliterator(),
false)
+ .filter(metric -> "OutgoingSnapshots".equals(metric.name()))
+ .findAny()
+ .get();
+ }
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index d49b02355f5..f857fdabbe4 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
@@ -122,12 +123,15 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
+ UUID snapshotId = UUID.randomUUID();
+
snapshot = new OutgoingSnapshot(
- UUID.randomUUID(),
+ snapshotId,
partitionKey,
partitionsByTableId,
mock(PartitionTxStateAccess.class),
- catalogService
+ catalogService,
+ new RaftSnapshotsMetricsSource()
);
snapshot.acquireMvLock();
@@ -157,12 +161,15 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
) {
when(txStateAccess.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
+ UUID snapshotId = UUID.randomUUID();
+
snapshot = new OutgoingSnapshot(
- UUID.randomUUID(),
+ snapshotId,
partitionKey,
Int2ObjectMaps.emptyMap(),
txStateAccess,
- catalogService
+ catalogService,
+ new RaftSnapshotsMetricsSource()
);
snapshot.acquireMvLock();
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index b7283ad58e2..f6bb26e7853 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMv
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
@@ -78,15 +79,20 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
return null;
}).when(outgoingSnapshotsManager).startOutgoingSnapshot(any(), any());
+ var partitionKey = new ZonePartitionKey(0, 0);
+
+ var snapshotMetricsSource = new RaftSnapshotsMetricsSource();
+
var snapshotStorage = new PartitionSnapshotStorage(
- new ZonePartitionKey(0, 0),
+ partitionKey,
mock(TopologyService.class),
outgoingSnapshotsManager,
txStateAccess,
catalogService,
mock(FailureProcessor.class),
mock(Executor.class),
- mock(LogStorageAccess.class)
+ mock(LogStorageAccess.class),
+ snapshotMetricsSource
);
snapshotStorage.addMvPartition(TABLE_ID_1, partitionAccess1);
@@ -100,7 +106,9 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
lenient().when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
- try (var reader = new OutgoingSnapshotReader(UUID.randomUUID(),
snapshotStorage)) {
+ UUID snapshotId = UUID.randomUUID();
+
+ try (var reader = new OutgoingSnapshotReader(snapshotId,
snapshotStorage, snapshotMetricsSource)) {
SnapshotMeta meta = reader.load();
assertEquals(10L, meta.lastIncludedIndex());
assertEquals(1L, meta.lastIncludedTerm());
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
index ed2329e09f5..212f31bbef8 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotTxDataStreamingTest.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotTxDa
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
@@ -107,12 +108,15 @@ class OutgoingSnapshotTxDataStreamingTest extends
BaseIgniteAbstractTest {
lenient().when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
+ UUID snapshotId = UUID.randomUUID();
+
snapshot = new OutgoingSnapshot(
- UUID.randomUUID(),
+ snapshotId,
partitionKey,
singleton(1, partitionAccess),
txAccess,
- catalogService
+ catalogService,
+ new RaftSnapshotsMetricsSource()
);
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
index 8bafdceaf0e..a4dbc22532d 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotsManagerTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.catalog.CatalogService;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.metrics.RaftSnapshotsMetricsSource;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.TablePartitionKey;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -86,12 +87,15 @@ class OutgoingSnapshotsManagerTest extends
BaseIgniteAbstractTest {
when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
+ UUID snapshotId = UUID.randomUUID();
+
OutgoingSnapshot snapshot = new OutgoingSnapshot(
- UUID.randomUUID(),
+ snapshotId,
partitionKey,
singleton(TABLE_ID, partitionAccess),
mock(PartitionTxStateAccess.class),
- catalogService
+ catalogService,
+ new RaftSnapshotsMetricsSource()
);
assertDoesNotThrow(() ->
manager.startOutgoingSnapshot(UUID.randomUUID(), snapshot));
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 34d6d674812..f4aff6292ca 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -70,7 +70,8 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("resource.vacuum", true),
new MetricSource("placement-driver", true),
new MetricSource("clock.service", true),
- new MetricSource("index.builder", true)
+ new MetricSource("index.builder", true),
+ new MetricSource("raft.snapshots", true)
};
@Inject
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index fb06a696c06..027fe98e9f6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -763,7 +763,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
txManager,
schemaManager,
dataStorageManager,
- outgoingSnapshotManager
+ outgoingSnapshotManager,
+ metricManager
);
TableManager tableManager = new TableManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 959cbe912f9..b2b8c53eeb3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1101,7 +1101,8 @@ public class IgniteImpl implements Ignite {
txManager,
schemaManager,
dataStorageMgr,
- outgoingSnapshotsManager
+ outgoingSnapshotsManager,
+ metricManager
);
systemViewManager.register(txManager);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index bfd49f6aeaf..0c2a49a435c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -500,7 +500,9 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
metaStorageManager);
- dsm = createDataStorageManager();
+ var metricManager = new NoOpMetricManager();
+
+ dsm = createDataStorageManager(metricManager);
AlwaysSyncedSchemaSyncService schemaSyncService = new
AlwaysSyncedSchemaSyncService();
@@ -543,7 +545,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
txManager,
sm,
dsm,
- outgoingSnapshotManager
+ outgoingSnapshotManager,
+ metricManager
));
tableManager = new TableManager(
@@ -583,7 +586,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
new SystemPropertiesNodeProperties(),
minTimeCollectorService,
systemDistributedConfiguration,
- new NoOpMetricManager(),
+ metricManager,
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
) {
@@ -648,7 +651,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
);
}
- private DataStorageManager createDataStorageManager() {
+ private DataStorageManager createDataStorageManager(MetricManager
metricManager) {
ConfigurationRegistry mockedRegistry =
mock(ConfigurationRegistry.class);
when(mockedRegistry.getConfiguration(NodeConfiguration.KEY)).thenReturn(nodeConfiguration);
@@ -658,7 +661,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
DataStorageManager manager = new DataStorageManager(
dataStorageModules.createStorageEngines(
NODE_NAME,
- mock(MetricManager.class),
+ metricManager,
mockedRegistry,
workDir,
null,