This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a538038a9f IGNITE-27151 Add outgoing snapshot stats (#7219)
1a538038a9f is described below

commit 1a538038a9ff21a6b63fc28de7bc51c03bde06ff
Author: Ivan Zlenko <[email protected]>
AuthorDate: Wed Dec 17 12:55:03 2025 +0500

    IGNITE-27151 Add outgoing snapshot stats (#7219)
---
 .../ignite/internal/metrics/StopWatchTimer.java}   |  18 +--
 .../checkpoint/CheckpointMetricsTracker.java       |  61 +++----
 .../raft/snapshot/PartitionSnapshotStorage.java    |   2 +
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   |  35 +++-
 .../snapshot/outgoing/OutgoingSnapshotStats.java   | 180 +++++++++++++++++++++
 .../outgoing/OutgoingSnapshotStatsTest.java        | 127 +++++++++++++++
 6 files changed, 382 insertions(+), 41 deletions(-)

diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
similarity index 81%
rename from 
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
index 46433716c64..108caf6b39a 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Duration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/metrics/StopWatchTimer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+package org.apache.ignite.internal.metrics;
 
 import java.util.concurrent.TimeUnit;
 
@@ -24,33 +24,33 @@ import java.util.concurrent.TimeUnit;
  *
  * <p>Not thread safe.</p>
  */
-class Duration {
+public class StopWatchTimer {
     private long startNanos;
 
     private long endNanos;
 
-    /** Callback before the operation starts. */
-    void onStart() {
+    /** Starts timer. */
+    public void start() {
         startNanos = System.nanoTime();
     }
 
-    /** Callback after the end of the operation. */
-    void onEnd() {
+    /** Ends timer. */
+    public void end() {
         endNanos = System.nanoTime();
     }
 
     /** Returns the start time of the operation in nanos. */
-    long startNanos() {
+    public long startNanos() {
         return startNanos;
     }
 
     /** Returns the end time of the operation in nanos. */
-    long endNanos() {
+    public long endNanos() {
         return startNanos;
     }
 
     /** Returns the duration in the specified time unit. */
-    long duration(TimeUnit timeUnit) {
+    public long duration(TimeUnit timeUnit) {
         return timeUnit.convert(endNanos - startNanos, TimeUnit.NANOSECONDS);
     }
 }
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
index fcd0f7d096d..8c8adbb075e 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMetricsTracker.java
@@ -22,6 +22,7 @@ import static 
java.util.concurrent.atomic.AtomicIntegerFieldUpdater.newUpdater;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.ignite.internal.metrics.StopWatchTimer;
 import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
 import org.apache.ignite.internal.util.FastTimestamps;
 
@@ -52,25 +53,25 @@ public class CheckpointMetricsTracker {
 
     private final long startTimestamp = 
FastTimestamps.coarseCurrentTimeMillis();
 
-    private final Duration checkpointDuration = new Duration();
+    private final StopWatchTimer checkpointDuration = new StopWatchTimer();
 
-    private final Duration writeLockWaitDuration = new Duration();
+    private final StopWatchTimer writeLockWaitDuration = new StopWatchTimer();
 
-    private final Duration onBeforeCheckpointBeginDuration = new Duration();
+    private final StopWatchTimer onBeforeCheckpointBeginDuration = new 
StopWatchTimer();
 
-    private final Duration onMarkCheckpointBeginDuration = new Duration();
+    private final StopWatchTimer onMarkCheckpointBeginDuration = new 
StopWatchTimer();
 
-    private final Duration writeLockHoldDuration = new Duration();
+    private final StopWatchTimer writeLockHoldDuration = new StopWatchTimer();
 
-    private final Duration pagesWriteDuration = new Duration();
+    private final StopWatchTimer pagesWriteDuration = new StopWatchTimer();
 
-    private final Duration fsyncDuration = new Duration();
+    private final StopWatchTimer fsyncDuration = new StopWatchTimer();
 
-    private final Duration replicatorLogSyncDuration = new Duration();
+    private final StopWatchTimer replicatorLogSyncDuration = new 
StopWatchTimer();
 
-    private final Duration splitAndSortCheckpointPagesDuration = new 
Duration();
+    private final StopWatchTimer splitAndSortCheckpointPagesDuration = new 
StopWatchTimer();
 
-    private final Duration waitPageReplacement = new Duration();
+    private final StopWatchTimer waitPageReplacement = new StopWatchTimer();
 
     /**
      * Increments counter if copy on write page was written.
@@ -123,7 +124,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onCheckpointStart() {
-        checkpointDuration.onStart();
+        checkpointDuration.start();
     }
 
     /**
@@ -132,7 +133,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onCheckpointEnd() {
-        checkpointDuration.onEnd();
+        checkpointDuration.end();
     }
 
     /**
@@ -150,7 +151,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWriteLockWaitStart() {
-        writeLockWaitDuration.onStart();
+        writeLockWaitDuration.start();
     }
 
     /**
@@ -159,7 +160,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWriteLockWaitEnd() {
-        writeLockWaitDuration.onEnd();
+        writeLockWaitDuration.end();
     }
 
     /**
@@ -177,7 +178,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onBeforeCheckpointBeginStart() {
-        onBeforeCheckpointBeginDuration.onStart();
+        onBeforeCheckpointBeginDuration.start();
     }
 
     /**
@@ -186,7 +187,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onBeforeCheckpointBeginEnd() {
-        onBeforeCheckpointBeginDuration.onEnd();
+        onBeforeCheckpointBeginDuration.end();
     }
 
     /**
@@ -204,7 +205,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onMarkCheckpointBeginStart() {
-        onMarkCheckpointBeginDuration.onStart();
+        onMarkCheckpointBeginDuration.start();
     }
 
     /**
@@ -213,7 +214,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onMarkCheckpointBeginEnd() {
-        onMarkCheckpointBeginDuration.onEnd();
+        onMarkCheckpointBeginDuration.end();
     }
 
     /**
@@ -231,7 +232,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onPagesWriteStart() {
-        pagesWriteDuration.onStart();
+        pagesWriteDuration.start();
     }
 
     /**
@@ -240,7 +241,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onPagesWriteEnd() {
-        pagesWriteDuration.onEnd();
+        pagesWriteDuration.end();
     }
 
     /**
@@ -258,7 +259,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onFsyncStart() {
-        fsyncDuration.onStart();
+        fsyncDuration.start();
     }
 
     /**
@@ -267,7 +268,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onFsyncEnd() {
-        fsyncDuration.onEnd();
+        fsyncDuration.end();
     }
 
     /**
@@ -285,7 +286,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onSplitAndSortCheckpointPagesStart() {
-        splitAndSortCheckpointPagesDuration.onStart();
+        splitAndSortCheckpointPagesDuration.start();
     }
 
     /**
@@ -294,7 +295,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onSplitAndSortCheckpointPagesEnd() {
-        splitAndSortCheckpointPagesDuration.onEnd();
+        splitAndSortCheckpointPagesDuration.end();
     }
 
     /**
@@ -312,7 +313,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onReplicatorLogSyncStart() {
-        replicatorLogSyncDuration.onStart();
+        replicatorLogSyncDuration.start();
     }
 
     /**
@@ -321,7 +322,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onReplicatorLogSyncEnd() {
-        replicatorLogSyncDuration.onEnd();
+        replicatorLogSyncDuration.end();
     }
 
     /**
@@ -339,7 +340,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWriteLockHoldStart() {
-        writeLockHoldDuration.onStart();
+        writeLockHoldDuration.start();
     }
 
     /**
@@ -348,7 +349,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWriteLockHoldEnd() {
-        writeLockHoldDuration.onEnd();
+        writeLockHoldDuration.end();
     }
 
     /**
@@ -375,7 +376,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWaitPageReplacementStart() {
-        waitPageReplacement.onStart();
+        waitPageReplacement.start();
     }
 
     /**
@@ -384,7 +385,7 @@ public class CheckpointMetricsTracker {
      * <p>Not thread safe.</p>
      */
     public void onWaitPageReplacementEnd() {
-        waitPageReplacement.onEnd();
+        waitPageReplacement.end();
     }
 
     /**
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index a28d4bcd2a3..c7b8588f170 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -279,6 +279,8 @@ public class PartitionSnapshotStorage {
     }
 
     private void completeSnapshotOperation(UUID snapshotId) {
+        LOG.info("Finishing outgoing snapshot [partitionKey={}, 
snapshotId={}]", partitionKey, snapshotId);
+
         synchronized (snapshotOperationLock) {
             CompletableFuture<Void> operationFuture = 
ongoingSnapshotOperations.remove(snapshotId);
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index cc9cf2f52aa..026c6066822 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -143,6 +143,8 @@ public class OutgoingSnapshot {
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
     private final AtomicBoolean closedGuard = new AtomicBoolean();
 
+    private final OutgoingSnapshotStats snapshotStats;
+
     /**
      * Creates a new instance.
      */
@@ -158,6 +160,7 @@ public class OutgoingSnapshot {
         this.partitionsByTableId = partitionsByTableId;
         this.txState = txState;
         this.catalogService = catalogService;
+        this.snapshotStats = new OutgoingSnapshotStats(id, partitionKey);
     }
 
     /**
@@ -183,6 +186,8 @@ public class OutgoingSnapshot {
         acquireMvLock();
 
         try {
+            snapshotStats.onSnapshotStart();
+
             int catalogVersion = catalogService.latestCatalogVersion();
 
             List<PartitionMvStorageAccess> partitionStorages = 
freezePartitionStorages();
@@ -217,6 +222,8 @@ public class OutgoingSnapshot {
 
             assert config != null : "Configuration should never be null when 
installing a snapshot";
 
+            snapshotStats.setSnapshotMeta(txState.lastAppliedIndex(), 
txState.lastAppliedTerm(), config, catalogVersion);
+
             return snapshotMetaAt(
                     txState.lastAppliedIndex(),
                     txState.lastAppliedTerm(),
@@ -230,6 +237,13 @@ public class OutgoingSnapshot {
 
             assert config != null : "Configuration should never be null when 
installing a snapshot";
 
+            snapshotStats.setSnapshotMeta(
+                    partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
+                    partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
+                    config,
+                    catalogVersion
+            );
+
             return snapshotMetaAt(
                     partitionStorageWithMaxAppliedIndex.lastAppliedIndex(),
                     partitionStorageWithMaxAppliedIndex.lastAppliedTerm(),
@@ -320,6 +334,8 @@ public class OutgoingSnapshot {
         long totalBatchSize = 0;
         List<ResponseEntry> batch = new ArrayList<>();
 
+        snapshotStats.onStartMvDataBatchProcessing();
+
         while (true) {
             acquireMvLock();
 
@@ -331,6 +347,8 @@ public class OutgoingSnapshot {
                 // As out-of-order rows are added under the same lock that we 
hold, and we always send OOO data first,
                 // exhausting the partition means that no MV data to send is 
left, we are finished with it.
                 if (finishedMvData() || batchIsFull(request, totalBatchSize)) {
+                    snapshotStats.onEndMvDataBatchProcessing();
+
                     return 
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
                             .rows(batch)
                             .finish(finishedMvData())
@@ -359,7 +377,12 @@ public class OutgoingSnapshot {
             }
 
             rowEntries.add(rowEntry);
-            totalBytesAfter += rowSizeInBytes(rowEntry.rowVersions());
+
+            long rowSize = rowSizeInBytes(rowEntry.rowVersions());
+
+            
snapshotStats.onProcessOutOfOrderRow(rowEntry.rowVersions().size(), rowSize);
+
+            totalBytesAfter += rowSize;
         }
 
         return totalBytesAfter;
@@ -403,7 +426,11 @@ public class OutgoingSnapshot {
 
                 batch.add(rowEntry);
 
-                totalBatchSize += rowSizeInBytes(rowEntry.rowVersions());
+                long rowSize = rowSizeInBytes(rowEntry.rowVersions());
+
+                totalBatchSize += rowSize;
+
+                
snapshotStats.onProcessRegularRow(rowEntry.rowVersions().size(), rowSize);
             }
         }
 
@@ -634,6 +661,10 @@ public class OutgoingSnapshot {
 
         busyLock.block();
 
+        snapshotStats.onSnapshotEnd();
+
+        snapshotStats.logSnapshotStats();
+
         if (!finishedTxData) {
             Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
new file mode 100644
index 00000000000..25e8c933fb2
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStats.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metrics.StopWatchTimer;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+
+/**
+ * Tracks important metrics for the outgoing snapshot.
+ *
+ * <p>Note: not thread safe. Thread safety guaranteed by operation's order in 
{@link IncomingSnapshotCopier}.
+ */
+class OutgoingSnapshotStats {
+    private static final IgniteLogger LOG = 
Loggers.forClass(OutgoingSnapshotStats.class);
+
+    long rowsSent;
+
+    long rowVersionsSent;
+
+    long totalBytesSent;
+
+    long outOfOrderRowsSent;
+
+    long outOfOrderVersionsSent;
+
+    long outOfOrderTotalBytesSent;
+
+    int totalBatches;
+
+    private final StopWatchTimer currentBatchTimer = new StopWatchTimer();
+
+    long combinedBatchDuration;
+
+    long minBatchDuration;
+
+    long maxBatchDuration;
+
+    final StopWatchTimer totalSnapshotTimer = new StopWatchTimer();
+
+    long lastIncludedIndex;
+
+    long lastIncludedTerm;
+
+    final List<String> peers = new ArrayList<>();
+
+    final List<String> oldPeers = new ArrayList<>();
+
+    final List<String> learners = new ArrayList<>();
+
+    final List<String> oldLearners = new ArrayList<>();
+
+    int catalogVersion;
+
+    private final UUID snapshotId;
+
+    private final PartitionKey partitionKey;
+
+    /** Constructor. */
+    OutgoingSnapshotStats(UUID snapshotId, PartitionKey partitionKey) {
+        this.snapshotId = snapshotId;
+        this.partitionKey = partitionKey;
+    }
+
+    void onSnapshotStart() {
+        totalSnapshotTimer.start();
+    }
+
+    void onSnapshotEnd() {
+        totalSnapshotTimer.end();
+    }
+
+    void setSnapshotMeta(long lastAppliedIndex, long lastAppliedTerm, 
RaftGroupConfiguration config, int catalogVersion) {
+        this.lastIncludedIndex = lastAppliedIndex;
+        this.lastIncludedTerm = lastAppliedTerm;
+        this.catalogVersion = catalogVersion;
+
+        peers.addAll(config.peers());
+
+        List<String> oldPeers = config.oldPeers();
+        if (oldPeers != null) {
+            this.oldPeers.addAll(oldPeers);
+        }
+
+        learners.addAll(config.learners());
+
+        List<String> oldLearners = config.oldLearners();
+        if (oldLearners != null) {
+            this.oldLearners.addAll(oldLearners);
+        }
+    }
+
+    void onStartMvDataBatchProcessing() {
+        this.currentBatchTimer.start();
+    }
+
+    void onEndMvDataBatchProcessing() {
+        this.currentBatchTimer.end();
+
+        totalBatches++;
+
+        long batchDuration = currentBatchTimer.duration(MILLISECONDS);
+
+        minBatchDuration = totalBatches == 1
+                ? batchDuration
+                : Math.min(minBatchDuration, batchDuration);
+
+        maxBatchDuration = Math.max(maxBatchDuration, batchDuration);
+
+        combinedBatchDuration += batchDuration;
+    }
+
+    void onProcessOutOfOrderRow(long rowVersions, long totalBytes) {
+        outOfOrderRowsSent++;
+
+        outOfOrderVersionsSent += rowVersions;
+        outOfOrderTotalBytesSent += totalBytes;
+    }
+
+    void onProcessRegularRow(long rowVersions, long totalBytes) {
+        rowsSent++;
+
+        rowVersionsSent += rowVersions;
+        totalBytesSent += totalBytes;
+    }
+
+    void logSnapshotStats() {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Outgoing snapshot installation completed 
[partitionKey={}, snapshotId={}, rows={}, rowVersions={}, totalBytes={}, "
+                            + " outOfOrderRows={}, outOfOrderVersions={}, 
outOfOrderTotalBytes={}, totalBatches={},"
+                            + " avgBatchProcessingTime={}ms, 
minBatchProcessingTime={}ms, maxBatchProcessingTime={}ms,"
+                            + " totalSnapshotInstallationTime={}ms, 
lastIncludedIndex={}, lastIncludedTerm={}, peers=[{}], oldPeers=[{}],"
+                            + " learners=[{}], oldLearners=[{}], 
catalogVersion={}]",
+                    partitionKey,
+                    snapshotId,
+                    rowsSent + outOfOrderRowsSent,
+                    rowVersionsSent + outOfOrderVersionsSent,
+                    totalBytesSent + outOfOrderTotalBytesSent,
+                    outOfOrderRowsSent,
+                    outOfOrderVersionsSent,
+                    outOfOrderTotalBytesSent,
+                    totalBatches,
+                    totalBatches > 0 ? combinedBatchDuration / totalBatches : 
0,
+                    minBatchDuration,
+                    maxBatchDuration,
+                    totalSnapshotTimer.duration(MILLISECONDS),
+                    lastIncludedIndex,
+                    lastIncludedTerm,
+                    peers,
+                    oldPeers,
+                    learners,
+                    oldLearners,
+                    catalogVersion
+            );
+        }
+    }
+}
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
new file mode 100644
index 00000000000..a7cd1800c0c
--- /dev/null
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotStatsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link OutgoingSnapshotStats}.
+ */
+class OutgoingSnapshotStatsTest {
+    private static final long LAST_INCLUDED_INDEX = 1;
+
+    private static final long LAST_INCLUDED_TERM = 2;
+
+    private static final List<String> PEERS = List.of("node1", "node2");
+
+    private static final List<String> OLD_PEERS = List.of("node3", "node4");
+
+    private static final List<String> LEARNERS = List.of("node5", "node6");
+
+    private static final List<String> OLD_LEARNERS = List.of("node7", "node8");
+
+    private static final RaftGroupConfiguration CONFIG = new 
RaftGroupConfiguration(
+            LAST_INCLUDED_INDEX,
+            LAST_INCLUDED_TERM,
+            0,
+            0,
+            PEERS,
+            LEARNERS,
+            OLD_PEERS,
+            OLD_LEARNERS
+    );
+
+    private static final int CATALOG_VERSION = 10;
+
+    private final OutgoingSnapshotStats stats = new 
OutgoingSnapshotStats(UUID.randomUUID(), new ZonePartitionKey(0, 0));
+
+    @Test
+    void totalSnapshotDuration() throws InterruptedException {
+        stats.onSnapshotStart();
+
+        TimeUnit.MILLISECONDS.sleep(1);
+
+        stats.onSnapshotEnd();
+
+        assertThat(stats.totalSnapshotTimer.duration(TimeUnit.MILLISECONDS), 
greaterThan(0L));
+    }
+
+    @Test
+    void batchProcessingStats() throws InterruptedException {
+        stats.onStartMvDataBatchProcessing();
+
+        TimeUnit.MILLISECONDS.sleep(1);
+
+        stats.onEndMvDataBatchProcessing();
+
+        assertThat(stats.totalBatches, is(1));
+
+        assertThat(stats.combinedBatchDuration, greaterThan(0L));
+        assertThat(stats.minBatchDuration, greaterThan(0L));
+        assertThat(stats.maxBatchDuration, greaterThan(0L));
+    }
+
+    @Test
+    void setSnapshotMeta() {
+        stats.setSnapshotMeta(LAST_INCLUDED_INDEX, LAST_INCLUDED_TERM, CONFIG, 
CATALOG_VERSION);
+
+        assertThat(stats.lastIncludedIndex, is(LAST_INCLUDED_INDEX));
+        assertThat(stats.lastIncludedTerm, is(LAST_INCLUDED_TERM));
+
+        assertThat(stats.peers, containsInAnyOrder(PEERS.toArray()));
+        assertThat(stats.oldPeers, containsInAnyOrder(OLD_PEERS.toArray()));
+        assertThat(stats.learners, containsInAnyOrder(LEARNERS.toArray()));
+        assertThat(stats.oldLearners, 
containsInAnyOrder(OLD_LEARNERS.toArray()));
+
+        assertThat(stats.catalogVersion, is(CATALOG_VERSION));
+    }
+
+    @Test
+    void onRowProcessing() {
+        long rowVersions = 10;
+        long bytes = 100;
+
+        stats.onProcessRegularRow(rowVersions, bytes);
+
+        assertThat(stats.rowsSent, is(1L));
+        assertThat(stats.rowVersionsSent, is(rowVersions));
+        assertThat(stats.totalBytesSent, is(bytes));
+    }
+
+    @Test
+    void onOutOfOrderRowProcessing() {
+        long rowVersions = 10;
+        long bytes = 100;
+
+        stats.onProcessOutOfOrderRow(rowVersions, bytes);
+
+        assertThat(stats.outOfOrderRowsSent, is(1L));
+        assertThat(stats.outOfOrderVersionsSent, is(rowVersions));
+        assertThat(stats.outOfOrderTotalBytesSent, is(bytes));
+    }
+}

Reply via email to