This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1a538038a9f IGNITE-27151 Add outgoing snapshot stats (#7219)
1a538038a9f is described below
commit 1a538038a9ff21a6b63fc28de7bc51c03bde06ff
Author: Ivan Zlenko <[email protected]>
AuthorDate: Wed Dec 17 12:55:03 2025 +0500
IGNITE-27151 Add outgoing snapshot stats (#7219)
---
.../ignite/internal/metrics/StopWatchTimer.java} | 18 +--
.../checkpoint/CheckpointMetricsTracker.java | 61 +++----
.../raft/snapshot/PartitionSnapshotStorage.java | 2 +
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 35 +++-
.../snapshot/outgoing/OutgoingSnapshotStats.java | 180 +++++++++++++++++++++
.../outgoing/OutgoingSnapshotStatsTest.java | 127 +++++++++++++++
6 files changed, 382 insertions(+), 41 deletions(-)
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
b/modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
similarity index 81%
rename from
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
index 46433716c64..108caf6b39a 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.metrics;
import java.util.concurrent.TimeUnit;
@@ -24,33 +24,33 @@ import java.util.concurrent.TimeUnit;
*
* <p>Not thread safe.</p>
*/
-class Duration {
+public class StopWatchTimer {
private long startNanos;
private long endNanos;
- /** Callback before the operation starts. */
- void onStart() {
+ /** Starts timer. */
+ public void start() {
startNanos = System.nanoTime();
}
- /** Callback after the end of the operation. */
- void onEnd() {
+ /** Ends timer. */
+ public void end() {
endNanos = System.nanoTime();
}
/** Returns the start time of the operation in nanos. */
- long startNanos() {
+ public long startNanos() {
return startNanos;
}
/** Returns the end time of the operation in nanos. */
- long endNanos() {
+ public long endNanos() {
return startNanos;
}
/** Returns the duration in the specified time unit. */
- long duration(TimeUnit timeUnit) {
+ public long duration(TimeUnit timeUnit) {
return timeUnit.convert(endNanos - startNanos, TimeUnit.NANOSECONDS);
}
}
diff --git
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
index fcd0f7d096d..8c8adbb075e 100644
---
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
+++
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
@@ -22,6 +22,7 @@ import static
java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.metrics.StopWatchTimer;
import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
import org.apache.ignite.internal.util.FastTimestamps;
@@ -52,25 +53,25 @@ public class CheckpointMetricsTracker {
private final long startTimestamp =
FastTimestamps.coarseCurrentTimeMillis();
- private final Duration checkpointDuration = new Duration();
+ private final StopWatchTimer checkpointDuration = new StopWatchTimer();
- private final Duration writeLockWaitDuration = new Duration();
+ private final StopWatchTimer writeLockWaitDuration = new StopWatchTimer();
- private final Duration onBeforeCheckpointBeginDuration = new Duration();
+ private final StopWatchTimer onBeforeCheckpointBeginDuration = new
StopWatchTimer();
- private final Duration onMarkCheckpointBeginDuration = new Duration();
+ private final StopWatchTimer onMarkCheckpointBeginDuration = new
StopWatchTimer();
- private final Duration writeLockHoldDuration = new Duration();
+ private final StopWatchTimer writeLockHoldDuration = new StopWatchTimer();
- private final Duration pagesWriteDuration = new Duration();
+ private final StopWatchTimer pagesWriteDuration = new StopWatchTimer();
- private final Duration fsyncDuration = new Duration();
+ private final StopWatchTimer fsyncDuration = new StopWatchTimer();
- private final Duration replicatorLogSyncDuration = new Duration();
+ private final StopWatchTimer replicatorLogSyncDuration = new
StopWatchTimer();
- private final Duration splitAndSortCheckpointPagesDuration = new
Duration();
+ private final StopWatchTimer splitAndSortCheckpointPagesDuration = new
StopWatchTimer();
- private final Duration waitPageReplacement = new Duration();
+ private final StopWatchTimer waitPageReplacement = new StopWatchTimer();
/**
* Increments counter if copy on write page was written.
@@ -123,7 +124,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onCheckpointStart() {
- checkpointDuration.onStart();
+ checkpointDuration.start();
}
/**
@@ -132,7 +133,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onCheckpointEnd() {
- checkpointDuration.onEnd();
+ checkpointDuration.end();
}
/**
@@ -150,7 +151,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWriteLockWaitStart() {
- writeLockWaitDuration.onStart();
+ writeLockWaitDuration.start();
}
/**
@@ -159,7 +160,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWriteLockWaitEnd() {
- writeLockWaitDuration.onEnd();
+ writeLockWaitDuration.end();
}
/**
@@ -177,7 +178,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onBeforeCheckpointBeginStart() {
- onBeforeCheckpointBeginDuration.onStart();
+ onBeforeCheckpointBeginDuration.start();
}
/**
@@ -186,7 +187,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onBeforeCheckpointBeginEnd() {
- onBeforeCheckpointBeginDuration.onEnd();
+ onBeforeCheckpointBeginDuration.end();
}
/**
@@ -204,7 +205,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onMarkCheckpointBeginStart() {
- onMarkCheckpointBeginDuration.onStart();
+ onMarkCheckpointBeginDuration.start();
}
/**
@@ -213,7 +214,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onMarkCheckpointBeginEnd() {
- onMarkCheckpointBeginDuration.onEnd();
+ onMarkCheckpointBeginDuration.end();
}
/**
@@ -231,7 +232,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onPagesWriteStart() {
- pagesWriteDuration.onStart();
+ pagesWriteDuration.start();
}
/**
@@ -240,7 +241,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onPagesWriteEnd() {
- pagesWriteDuration.onEnd();
+ pagesWriteDuration.end();
}
/**
@@ -258,7 +259,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onFsyncStart() {
- fsyncDuration.onStart();
+ fsyncDuration.start();
}
/**
@@ -267,7 +268,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onFsyncEnd() {
- fsyncDuration.onEnd();
+ fsyncDuration.end();
}
/**
@@ -285,7 +286,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onSplitAndSortCheckpointPagesStart() {
- splitAndSortCheckpointPagesDuration.onStart();
+ splitAndSortCheckpointPagesDuration.start();
}
/**
@@ -294,7 +295,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onSplitAndSortCheckpointPagesEnd() {
- splitAndSortCheckpointPagesDuration.onEnd();
+ splitAndSortCheckpointPagesDuration.end();
}
/**
@@ -312,7 +313,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onReplicatorLogSyncStart() {
- replicatorLogSyncDuration.onStart();
+ replicatorLogSyncDuration.start();
}
/**
@@ -321,7 +322,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onReplicatorLogSyncEnd() {
- replicatorLogSyncDuration.onEnd();
+ replicatorLogSyncDuration.end();
}
/**
@@ -339,7 +340,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWriteLockHoldStart() {
- writeLockHoldDuration.onStart();
+ writeLockHoldDuration.start();
}
/**
@@ -348,7 +349,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWriteLockHoldEnd() {
- writeLockHoldDuration.onEnd();
+ writeLockHoldDuration.end();
}
/**
@@ -375,7 +376,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWaitPageReplacementStart() {
- waitPageReplacement.onStart();
+ waitPageReplacement.start();
}
/**
@@ -384,7 +385,7 @@ public class CheckpointMetricsTracker {
* <p>Not thread safe.</p>
*/
public void onWaitPageReplacementEnd() {
- waitPageReplacement.onEnd();
+ waitPageReplacement.end();
}
/**
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index a28d4bcd2a3..c7b8588f170 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -279,6 +279,8 @@ public class PartitionSnapshotStorage {
}
private void completeSnapshotOperation(UUID snapshotId) {
+ LOG.info("Finishing outgoing snapshot [partitionKey={},
snapshotId={}]", partitionKey, snapshotId);
+
synchronized (snapshotOperationLock) {
CompletableFuture<Void> operationFuture =
ongoingSnapshotOperations.remove(snapshotId);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index cc9cf2f52aa..026c6066822 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -143,6 +143,8 @@ public class OutgoingSnapshot {
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
private final AtomicBoolean closedGuard = new AtomicBoolean();
+ private final OutgoingSnapshotStats snapshotStats;
+
/**
* Creates a new instance.
*/
@@ -158,6 +160,7 @@ public class OutgoingSnapshot {
this.partitionsByTableId = partitionsByTableId;
this.txState = txState;
this.catalogService = catalogService;
+ this.snapshotStats = new OutgoingSnapshotStats(id, partitionKey);
}
/**
@@ -183,6 +186,8 @@ public class OutgoingSnapshot {
acquireMvLock();
try {
+ snapshotStats.onSnapshotStart();
+
int catalogVersion = catalogService.latestCatalogVersion();
List<PartitionMvStorageAccess> partitionStorages =
freezePartitionStorages();
@@ -217,6 +222,8 @@ public class OutgoingSnapshot {
assert config != null : "Configuration should never be null when
installing a snapshot";
+ snapshotStats.setSnapshotMeta(txState.lastAppliedIndex(),
txState.lastAppliedTerm(), config, catalogVersion);
+
return snapshotMetaAt(
txState.lastAppliedIndex(),
txState.lastAppliedTerm(),
@@ -230,6 +237,13 @@ public class OutgoingSnapshot {
assert config != null : "Configuration should never be null when
installing a snapshot";
+ snapshotStats.setSnapshotMeta(
+ partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
+ partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
+ config,
+ catalogVersion
+ );
+
return snapshotMetaAt(
partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
@@ -320,6 +334,8 @@ public class OutgoingSnapshot {
long totalBatchSize = 0;
List<ResponseEntry> batch = new ArrayList<>();
+ snapshotStats.onStartMvDataBatchProcessing();
+
while (true) {
acquireMvLock();
@@ -331,6 +347,8 @@ public class OutgoingSnapshot {
// As out-of-order rows are added under the same lock that we
hold, and we always send OOO data first,
// exhausting the partition means that no MV data to send is
left, we are finished with it.
if (finishedMvData() || batchIsFull(request, totalBatchSize)) {
+ snapshotStats.onEndMvDataBatchProcessing();
+
return
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
.rows(batch)
.finish(finishedMvData())
@@ -359,7 +377,12 @@ public class OutgoingSnapshot {
}
rowEntries.add(rowEntry);
- totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+
+ long rowSize = rowSizeInBytes(rowEntry.rowVersions());
+
+
snapshotStats.onProcessOutOfOrderRow(rowEntry.rowVersions().size(), rowSize);
+
+ totalBytesAfter += rowSize;
}
return totalBytesAfter;
@@ -403,7 +426,11 @@ public class OutgoingSnapshot {
batch.add(rowEntry);
- totalBatchSize += rowSizeInBytes(rowEntry.rowVersions());
+ long rowSize = rowSizeInBytes(rowEntry.rowVersions());
+
+ totalBatchSize += rowSize;
+
+
snapshotStats.onProcessRegularRow(rowEntry.rowVersions().size(), rowSize);
}
}
@@ -634,6 +661,10 @@ public class OutgoingSnapshot {
busyLock.block();
+ snapshotStats.onSnapshotEnd();
+
+ snapshotStats.logSnapshotStats();
+
if (!finishedTxData) {
Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
new file mode 100644
index 00000000000..25e8c933fb2
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.StopWatchTimer;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safety guaranteed by operation's order in
{@link IncomingSnapshotCopier}.
+ */
+class OutgoingSnapshotStats {
+ private static final IgniteLogger LOG =
Loggers.forClass(OutgoingSnapshotStats.class);
+
+ long rowsSent;
+
+ long rowVersionsSent;
+
+ long totalBytesSent;
+
+ long outOfOrderRowsSent;
+
+ long outOfOrderVersionsSent;
+
+ long outOfOrderTotalBytesSent;
+
+ int totalBatches;
+
+ private final StopWatchTimer currentBatchTimer = new StopWatchTimer();
+
+ long combinedBatchDuration;
+
+ long minBatchDuration;
+
+ long maxBatchDuration;
+
+ final StopWatchTimer totalSnapshotTimer = new StopWatchTimer();
+
+ long lastIncludedIndex;
+
+ long lastIncludedTerm;
+
+ final List<String> peers = new ArrayList<>();
+
+ final List<String> oldPeers = new ArrayList<>();
+
+ final List<String> learners = new ArrayList<>();
+
+ final List<String> oldLearners = new ArrayList<>();
+
+ int catalogVersion;
+
+ private final UUID snapshotId;
+
+ private final PartitionKey partitionKey;
+
+ /** Constructor. */
+ OutgoingSnapshotStats(UUID snapshotId, PartitionKey partitionKey) {
+ this.snapshotId = snapshotId;
+ this.partitionKey = partitionKey;
+ }
+
+ void onSnapshotStart() {
+ totalSnapshotTimer.start();
+ }
+
+ void onSnapshotEnd() {
+ totalSnapshotTimer.end();
+ }
+
+ void setSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm,
RaftGroupConfiguration config, int catalogVersion) {
+ this.lastIncludedIndex = lastAppliedIndex;
+ this.lastIncludedTerm = lastAppliedTerm;
+ this.catalogVersion = catalogVersion;
+
+ peers.addAll(config.peers());
+
+ List<String> oldPeers = config.oldPeers();
+ if (oldPeers != null) {
+ this.oldPeers.addAll(oldPeers);
+ }
+
+ learners.addAll(config.learners());
+
+ List<String> oldLearners = config.oldLearners();
+ if (oldLearners != null) {
+ this.oldLearners.addAll(oldLearners);
+ }
+ }
+
+ void onStartMvDataBatchProcessing() {
+ this.currentBatchTimer.start();
+ }
+
+ void onEndMvDataBatchProcessing() {
+ this.currentBatchTimer.end();
+
+ totalBatches++;
+
+ long batchDuration = currentBatchTimer.duration(MILLISECONDS);
+
+ minBatchDuration = totalBatches == 1
+ ? batchDuration
+ : Math.min(minBatchDuration, batchDuration);
+
+ maxBatchDuration = Math.max(maxBatchDuration, batchDuration);
+
+ combinedBatchDuration += batchDuration;
+ }
+
+ void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+ outOfOrderRowsSent++;
+
+ outOfOrderVersionsSent += rowVersions;
+ outOfOrderTotalBytesSent += totalBytes;
+ }
+
+ void onProcessRegularRow(long rowVersions, long totalBytes) {
+ rowsSent++;
+
+ rowVersionsSent += rowVersions;
+ totalBytesSent += totalBytes;
+ }
+
+ void logSnapshotStats() {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Outgoing snapshot installation completed
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+ + " outOfOrderRows={}, outOfOrderVersions={},
outOfOrderTotalBytes={}, totalBatches={},"
+ + " avgBatchProcessingTime={}ms,
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+ + " totalSnapshotInstallationTime={}ms,
lastIncludedIndex={}, lastIncludedTerm={}, peers=[{}], oldPeers=[{}],"
+ + " learners=[{}], oldLearners=[{}],
catalogVersion={}]",
+ partitionKey,
+ snapshotId,
+ rowsSent + outOfOrderRowsSent,
+ rowVersionsSent + outOfOrderVersionsSent,
+ totalBytesSent + outOfOrderTotalBytesSent,
+ outOfOrderRowsSent,
+ outOfOrderVersionsSent,
+ outOfOrderTotalBytesSent,
+ totalBatches,
+ totalBatches > 0 ? combinedBatchDuration / totalBatches :
0,
+ minBatchDuration,
+ maxBatchDuration,
+ totalSnapshotTimer.duration(MILLISECONDS),
+ lastIncludedIndex,
+ lastIncludedTerm,
+ peers,
+ oldPeers,
+ learners,
+ oldLearners,
+ catalogVersion
+ );
+ }
+ }
+}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
new file mode 100644
index 00000000000..a7cd1800c0c
--- /dev/null
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link OutgoingSnapshotStats}.
+ */
+class OutgoingSnapshotStatsTest {
+ private static final long LAST_INCLUDED_INDEX = 1;
+
+ private static final long LAST_INCLUDED_TERM = 2;
+
+ private static final List<String> PEERS = List.of("node1", "node2");
+
+ private static final List<String> OLD_PEERS = List.of("node3", "node4");
+
+ private static final List<String> LEARNERS = List.of("node5", "node6");
+
+ private static final List<String> OLD_LEARNERS = List.of("node7", "node8");
+
+ private static final RaftGroupConfiguration CONFIG = new
RaftGroupConfiguration(
+ LAST_INCLUDED_INDEX,
+ LAST_INCLUDED_TERM,
+ 0,
+ 0,
+ PEERS,
+ LEARNERS,
+ OLD_PEERS,
+ OLD_LEARNERS
+ );
+
+ private static final int CATALOG_VERSION = 10;
+
+ private final OutgoingSnapshotStats stats = new
OutgoingSnapshotStats(UUID.randomUUID(), new ZonePartitionKey(0, 0));
+
+ @Test
+ void totalSnapshotDuration() throws InterruptedException {
+ stats.onSnapshotStart();
+
+ TimeUnit.MILLISECONDS.sleep(1);
+
+ stats.onSnapshotEnd();
+
+ assertThat(stats.totalSnapshotTimer.duration(TimeUnit.MILLISECONDS),
greaterThan(0L));
+ }
+
+ @Test
+ void batchProcessingStats() throws InterruptedException {
+ stats.onStartMvDataBatchProcessing();
+
+ TimeUnit.MILLISECONDS.sleep(1);
+
+ stats.onEndMvDataBatchProcessing();
+
+ assertThat(stats.totalBatches, is(1));
+
+ assertThat(stats.combinedBatchDuration, greaterThan(0L));
+ assertThat(stats.minBatchDuration, greaterThan(0L));
+ assertThat(stats.maxBatchDuration, greaterThan(0L));
+ }
+
+ @Test
+ void setSnapshotMeta() {
+ stats.setSnapshotMeta(LAST_INCLUDED_INDEX, LAST_INCLUDED_TERM, CONFIG,
CATALOG_VERSION);
+
+ assertThat(stats.lastIncludedIndex, is(LAST_INCLUDED_INDEX));
+ assertThat(stats.lastIncludedTerm, is(LAST_INCLUDED_TERM));
+
+ assertThat(stats.peers, containsInAnyOrder(PEERS.toArray()));
+ assertThat(stats.oldPeers, containsInAnyOrder(OLD_PEERS.toArray()));
+ assertThat(stats.learners, containsInAnyOrder(LEARNERS.toArray()));
+ assertThat(stats.oldLearners,
containsInAnyOrder(OLD_LEARNERS.toArray()));
+
+ assertThat(stats.catalogVersion, is(CATALOG_VERSION));
+ }
+
+ @Test
+ void onRowProcessing() {
+ long rowVersions = 10;
+ long bytes = 100;
+
+ stats.onProcessRegularRow(rowVersions, bytes);
+
+ assertThat(stats.rowsSent, is(1L));
+ assertThat(stats.rowVersionsSent, is(rowVersions));
+ assertThat(stats.totalBytesSent, is(bytes));
+ }
+
+ @Test
+ void onOutOfOrderRowProcessing() {
+ long rowVersions = 10;
+ long bytes = 100;
+
+ stats.onProcessOutOfOrderRow(rowVersions, bytes);
+
+ assertThat(stats.outOfOrderRowsSent, is(1L));
+ assertThat(stats.outOfOrderVersionsSent, is(rowVersions));
+ assertThat(stats.outOfOrderTotalBytesSent, is(bytes));
+ }
+}