This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c45763d2c2a IGNITE-24522 Fix races on Raft snapshot creation and 
adding a table to a zone. (#5242)
c45763d2c2a is described below

commit c45763d2c2ad32ff84d1dd6a85303093f8f585d5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Feb 18 15:00:49 2025 +0200

    IGNITE-24522 Fix races on Raft snapshot creation and adding a table to a 
zone. (#5242)
---
 .../snapshot/PartitionSnapshotStorageFactory.java  |   1 -
 .../outgoing/MvPartitionDeliveryState.java         |  45 ++++--
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   | 158 +++++++++++++++------
 .../outgoing/MvPartitionDeliveryStateTest.java     |  30 +++-
 .../outgoing/OutgoingSnapshotCommonTest.java       |  13 +-
 .../OutgoingSnapshotMvDataStreamingTest.java       | 151 ++++++++++++++++----
 .../outgoing/OutgoingSnapshotReaderTest.java       |  33 +++--
 .../SnapshotAwarePartitionDataStorageTest.java     |   8 +-
 .../SnapshotAwarePartitionDataStorage.java         |   2 +-
 9 files changed, 328 insertions(+), 113 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
index 07a7119f447..0526a77ef31 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -92,7 +92,6 @@ public class PartitionSnapshotStorageFactory implements 
SnapshotStorageFactory {
      * Adds a given table partition storage to the snapshot storage, managed 
by this factory.
      */
     public void addMvPartition(int tableId, PartitionMvStorageAccess 
partition) {
-        // FIXME: there are possible races with table creation, see 
https://issues.apache.org/jira/browse/IGNITE-24522
         PartitionMvStorageAccess prev = partitionsByTableId.put(tableId, 
partition);
 
         assert prev == null : "Partition storage for table ID " + tableId + " 
already exists.";
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
index a13fc12ba07..a18052bee63 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
 
-import java.util.Collection;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import java.util.Iterator;
+import java.util.List;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import org.apache.ignite.internal.storage.RowId;
 import org.jetbrains.annotations.Nullable;
@@ -29,22 +31,29 @@ import org.jetbrains.annotations.Nullable;
 class MvPartitionDeliveryState {
     private final Iterator<PartitionMvStorageAccess> partitionStoragesIterator;
 
-    /**
-     * Current row ID within the current partition storage. Can be {@code 
null} only if the snapshot has delivered all possible data.
-     */
+    /** Current row ID within the current partition storage. */
     @Nullable
     private RowId currentRowId;
 
-    /**
-     * Current partition storage. Can be {@code null} only if the snapshot has 
delivered all possible data.
-     */
+    /** Current partition storage. */
     @Nullable
     private PartitionMvStorageAccess currentPartitionStorage;
 
-    MvPartitionDeliveryState(Collection<PartitionMvStorageAccess> 
partitionStorages) {
+    private final IntSet tableIds;
+
+    private boolean isStarted = false;
+
+    /**
+     * Creates a new state. The state is initially positioned before the first 
row of the first storage.
+     *
+     * @param partitionStorages Partition storages to iterate over. They 
<b>must</b> be sorted by table ID in ascending order.
+     */
+    MvPartitionDeliveryState(List<PartitionMvStorageAccess> partitionStorages) 
{
         this.partitionStoragesIterator = partitionStorages.iterator();
 
-        advance();
+        tableIds = new IntOpenHashSet(partitionStorages.size());
+
+        partitionStorages.forEach(storage -> tableIds.add(storage.tableId()));
     }
 
     RowId currentRowId() {
@@ -63,14 +72,30 @@ class MvPartitionDeliveryState {
         return currentPartitionStorage().tableId();
     }
 
+    /**
+     * Returns {@code true} if the given table ID is in the range of tables 
that this state iterates over.
+     */
+    boolean isGoingToBeDelivered(int tableId) {
+        return tableIds.contains(tableId);
+    }
+
     /**
      * Returns {@code true} if all data for the snapshot has been retrieved.
      */
     boolean isExhausted() {
-        return currentPartitionStorage == null;
+        return currentPartitionStorage == null && 
!partitionStoragesIterator.hasNext();
+    }
+
+    /**
+     * Returns {@code true} if the {@link #advance()} method has been called 
at least once.
+     */
+    boolean hasIterationStarted() {
+        return isStarted;
     }
 
     void advance() {
+        isStarted = true;
+
         while (true) {
             if (currentPartitionStorage == null) {
                 if (!partitionStoragesIterator.hasNext()) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index a87b83f4ef0..cea03d1bba5 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -18,15 +18,17 @@
 package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
 
 import static java.lang.Math.max;
+import static java.util.Comparator.comparingInt;
 import static java.util.Comparator.comparingLong;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.collectNextRowIdToBuildIndexes;
 import static 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -34,7 +36,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -52,6 +56,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDa
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
 import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -61,7 +66,6 @@ import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxMetaMessage;
 import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -83,15 +87,15 @@ public class OutgoingSnapshot {
 
     private final PartitionKey partitionKey;
 
-    private final Int2ObjectSortedMap<PartitionMvStorageAccess> 
partitionsByTableId;
+    private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;
 
     private final PartitionTxStateAccess txState;
 
     private final CatalogService catalogService;
 
     /**
-     * Lock that is used for mutual exclusion of MV snapshot reading (by this 
class) and threads that write MV data to the same
-     * partition (currently, via {@link PartitionDataStorage}).
+     * Lock that is used for mutual exclusion of MV snapshot reading (by this 
class) and threads that write MV data to the same partition
+     * (currently, via {@link PartitionDataStorage}).
      */
     private final ReentrantLock mvOperationsLock = new ReentrantLock();
 
@@ -100,30 +104,42 @@ public class OutgoingSnapshot {
     private volatile PartitionSnapshotMeta frozenMeta;
 
     /**
-     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
-     * snapshot sends rows), hence they must be skipped when sending rows 
normally.
+     * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this snapshot sends rows),
+     * hence they must be skipped when sending rows normally.
+     *
+     * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
      */
-    private final Set<RowId> rowIdsToSkip = new ConcurrentHashSet<>();
+    private final Set<RowId> rowIdsToSkip = new HashSet<>();
 
     // TODO: IGNITE-18018 - manage queue size
     /**
-     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
-     * Versions inside rows are in oldest-to-newest order.
+     * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows). Versions inside rows are in
+     * oldest-to-newest order.
+     *
+     * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
      */
     private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new ArrayDeque<>();
 
     /**
-     * Current delivery state of MV partition data. Can be {@code null} only 
if the delivery has not started yet.
+     * Current delivery state of MV partition data.
+     *
+     * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
      */
     @Nullable
     private MvPartitionDeliveryState mvPartitionDeliveryState;
 
+    /**
+     * Cursor over TX data.
+     *
+     * <p>Inter-thread visibility is provided by accessing {@link 
#finishedTxData} in a correct order.
+     */
+    @Nullable
     private Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor;
 
     /**
      * This becomes {@code true} as soon as we exhaust TX data in the 
partition.
      */
-    private boolean finishedTxData = false;
+    private volatile boolean finishedTxData;
 
     private volatile boolean closed = false;
 
@@ -139,8 +155,7 @@ public class OutgoingSnapshot {
     ) {
         this.id = id;
         this.partitionKey = partitionKey;
-        // Create a sorted copy in order to process partitions in a 
deterministic order.
-        this.partitionsByTableId = new 
Int2ObjectAVLTreeMap<>(partitionsByTableId);
+        this.partitionsByTableId = partitionsByTableId;
         this.txState = txState;
         this.catalogService = catalogService;
     }
@@ -168,18 +183,27 @@ public class OutgoingSnapshot {
         acquireMvLock();
 
         try {
-            frozenMeta = takeSnapshotMeta();
+            int catalogVersion = catalogService.latestCatalogVersion();
+
+            List<PartitionMvStorageAccess> partitionStorages = 
freezePartitionStorages(catalogVersion);
+
+            frozenMeta = takeSnapshotMeta(catalogVersion, partitionStorages);
 
             txDataCursor = txState.getAllTxMeta();
+
+            // Write the flag to publish the TX cursor (in the memory model 
sense).
+            finishedTxData = false;
+
+            mvPartitionDeliveryState = new 
MvPartitionDeliveryState(partitionStorages);
         } finally {
             releaseMvLock();
         }
     }
 
-    private PartitionSnapshotMeta takeSnapshotMeta() {
+    private PartitionSnapshotMeta takeSnapshotMeta(int catalogVersion, 
Collection<PartitionMvStorageAccess> partitionStorages) {
         // TODO: partitionsByTableId will be empty for zones without tables, 
need another way to get meta in that case,
         //  see https://issues.apache.org/jira/browse/IGNITE-24517
-        PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = 
partitionsByTableId.values().stream()
+        PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex = 
partitionStorages.stream()
                 .max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
                 .orElseThrow();
 
@@ -187,11 +211,9 @@ public class OutgoingSnapshot {
 
         assert config != null : "Configuration should never be null when 
installing a snapshot";
 
-        int catalogVersion = catalogService.latestCatalogVersion();
-
         Map<Integer, UUID> nextRowIdToBuildByIndexId = 
collectNextRowIdToBuildIndexes(
                 catalogService,
-                partitionsByTableId.values(),
+                partitionStorages,
                 catalogVersion
         );
 
@@ -207,6 +229,29 @@ public class OutgoingSnapshot {
         );
     }
 
+    private List<PartitionMvStorageAccess> freezePartitionStorages(int 
catalogVersion) {
+        Catalog catalog = catalogService.catalog(catalogVersion);
+
+        if (partitionKey instanceof ZonePartitionKey) {
+            int zoneId = ((ZonePartitionKey) partitionKey).zoneId();
+
+            return partitionsByTableId.values().stream()
+                    .filter(storage -> {
+                        CatalogTableDescriptor tableDescriptor = 
catalog.table(storage.tableId());
+
+                        return tableDescriptor != null && 
tableDescriptor.zoneId() == zoneId;
+                    })
+                    .sorted(comparingInt(PartitionMvStorageAccess::tableId))
+                    .collect(toList());
+        } else {
+            // TODO: remove this clause, see 
https://issues.apache.org/jira/browse/IGNITE-22522
+            // For a non-colocation case we always have a single entry in this 
map.
+            assert partitionsByTableId.size() == 1;
+
+            return List.copyOf(partitionsByTableId.values());
+        }
+    }
+
     /**
      * Returns metadata corresponding to this snapshot.
      *
@@ -257,8 +302,6 @@ public class OutgoingSnapshot {
             return logThatAlreadyClosedAndReturnNull();
         }
 
-        assert !finishedMvData() : "MV data sending has already been finished";
-
         long totalBatchSize = 0;
         List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
 
@@ -273,17 +316,15 @@ public class OutgoingSnapshot {
                 // As out-of-order rows are added under the same lock that we 
hold, and we always send OOO data first,
                 // exhausting the partition means that no MV data to send is 
left, we are finished with it.
                 if (finishedMvData() || batchIsFull(request, totalBatchSize)) {
-                    break;
+                    return 
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
+                            .rows(batch)
+                            .finish(finishedMvData())
+                            .build();
                 }
             } finally {
                 releaseMvLock();
             }
         }
-
-        return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
-                .rows(batch)
-                .finish(finishedMvData())
-                .build();
     }
 
     private long fillWithOutOfOrderRows(
@@ -331,11 +372,9 @@ public class OutgoingSnapshot {
             return totalBatchSize;
         }
 
-        if (mvPartitionDeliveryState == null) {
-            mvPartitionDeliveryState = new 
MvPartitionDeliveryState(partitionsByTableId.values());
-        } else {
-            mvPartitionDeliveryState.advance();
-        }
+        assert mvPartitionDeliveryState != null : "Snapshot scope has not been 
frozen.";
+
+        mvPartitionDeliveryState.advance();
 
         if (!finishedMvData()) {
             RowId rowId = mvPartitionDeliveryState.currentRowId();
@@ -426,6 +465,12 @@ public class OutgoingSnapshot {
 
         List<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<>();
 
+        boolean finishedTxData = this.finishedTxData;
+
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor = this.txDataCursor;
+
+        assert txDataCursor != null : "Snapshot scope has not been frozen.";
+
         while (!finishedTxData && rows.size() < 
request.maxTransactionsInBatch()) {
             if (txDataCursor.hasNext()) {
                 rows.add(txDataCursor.next());
@@ -435,6 +480,8 @@ public class OutgoingSnapshot {
             }
         }
 
+        this.finishedTxData = finishedTxData;
+
         return buildTxDataResponse(rows, finishedTxData);
     }
 
@@ -478,13 +525,14 @@ public class OutgoingSnapshot {
     }
 
     /**
-     * Whether this snapshot is finished with sending MV data (i.e. it already 
sent all the MV data and is not going to send anything else).
-     *
-     * <p>Must be called under snapshot lock.
+     * Returns {@code true} if this snapshot is finished with sending MV data 
(i.e. it already sent all MV data and is not going to send
+     * anything else).
      *
-     * @return {@code true} if finished.
+     * <p>Must be called under {@link #mvOperationsLock}.
      */
     private boolean finishedMvData() {
+        assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
+
         return mvPartitionDeliveryState != null && 
mvPartitionDeliveryState.isExhausted();
     }
 
@@ -503,26 +551,39 @@ public class OutgoingSnapshot {
     }
 
     /**
-     * Returns {@code true} if the given {@link RowId} does not interfere with 
the rows that this snapshot is going
-     * to send in the normal snapshot rows sending order.
+     * Returns {@code true} if the given {@link RowId} does not interfere with 
the rows that this snapshot is going to send in the normal
+     * snapshot rows sending order.
      *
      * <p>Must be called under MV data snapshot lock.
      *
      * @param rowId RowId.
-     * @return {@code true} if the given RowId is already passed by the 
snapshot in normal rows sending order.
+     * @return {@code true} if the given RowId is already passed by the 
snapshot in normal rows sending order or if the RowId belongs to
+     *     a table that is not a part of this snapshot at all.
      */
-    public boolean alreadyPassed(int tableId, RowId rowId) {
+    public boolean alreadyPassedOrIrrelevant(int tableId, RowId rowId) {
         assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
 
         if (mvPartitionDeliveryState == null) {
-            // We haven't started sending MV data yet.
+            // We haven't even frozen the snapshot scope yet.
             return false;
         }
 
-        if (finishedMvData()) {
+        return !mvPartitionDeliveryState.isGoingToBeDelivered(tableId) || 
alreadyPassed(tableId, rowId);
+    }
+
+    private boolean alreadyPassed(int tableId, RowId rowId) {
+        assert mvPartitionDeliveryState != null;
+
+        if (mvPartitionDeliveryState.isExhausted()) {
+            // We have already finished delivering all data.
             return true;
         }
 
+        if (!mvPartitionDeliveryState.hasIterationStarted()) {
+            // We haven't started streaming the data yet.
+            return false;
+        }
+
         if (tableId == mvPartitionDeliveryState.currentTableId()) {
             // 'currentRowId' here has already been sent, hence the non-strict 
comparison.
             return rowId.compareTo(mvPartitionDeliveryState.currentRowId()) <= 
0;
@@ -552,10 +613,13 @@ public class OutgoingSnapshot {
      * Closes the snapshot releasing the underlying resources.
      */
     public void close() {
-        Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
+        if (!finishedTxData) {
+            Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
 
-        if (txCursor != null) {
-            closeLoggingProblems(txCursor);
+            if (txCursor != null) {
+                closeLoggingProblems(txCursor);
+                finishedTxData = true;
+            }
         }
 
         closed = true;
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
index 3cae52762c4..6ec8d503127 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
@@ -42,6 +42,7 @@ class MvPartitionDeliveryStateTest extends 
BaseIgniteAbstractTest {
     void emptyStateIsExhausted() {
         var state = new MvPartitionDeliveryState(List.of());
 
+        assertThat(state.hasIterationStarted(), is(false));
         assertThat(state.isExhausted(), is(true));
     }
 
@@ -74,29 +75,35 @@ class MvPartitionDeliveryStateTest extends 
BaseIgniteAbstractTest {
 
         var state = new MvPartitionDeliveryState(storages);
 
+        assertThat(state.hasIterationStarted(), is(false));
+        assertThat(state.isExhausted(), is(false));
+
         for (int i = 0; i < storages.size(); i++) {
-            assertThat(state.isExhausted(), is(false));
+            state.advance();
 
+            assertThat(state.hasIterationStarted(), is(true));
+            assertThat(state.isExhausted(), is(false));
             assertThat(state.currentTableId(), is(i));
             assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(0)));
 
             state.advance();
 
+            assertThat(state.hasIterationStarted(), is(true));
             assertThat(state.isExhausted(), is(false));
-
             assertThat(state.currentTableId(), is(i));
             assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(1)));
 
             state.advance();
 
+            assertThat(state.hasIterationStarted(), is(true));
             assertThat(state.isExhausted(), is(false));
-
             assertThat(state.currentTableId(), is(i));
             assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(2)));
-
-            state.advance();
         }
 
+        state.advance();
+
+        assertThat(state.hasIterationStarted(), is(true));
         assertThat(state.isExhausted(), is(true));
     }
 
@@ -122,20 +129,29 @@ class MvPartitionDeliveryStateTest extends 
BaseIgniteAbstractTest {
 
         var state = new MvPartitionDeliveryState(List.of(storage1, storage2));
 
+        assertThat(state.hasIterationStarted(), is(false));
+        assertThat(state.isExhausted(), is(false));
+
         IntStream.rangeClosed(1, 2).forEach(i -> {
+            state.advance();
+
+            assertThat(state.hasIterationStarted(), is(true));
             assertThat(state.isExhausted(), is(false));
+
             assertThat(state.currentTableId(), is(i));
             assertThat(state.currentRowId(), is(lowestRowId));
 
             state.advance();
 
+            assertThat(state.hasIterationStarted(), is(true));
             assertThat(state.isExhausted(), is(false));
             assertThat(state.currentTableId(), is(i));
             assertThat(state.currentRowId(), is(highestRowId));
-
-            state.advance();
         });
 
+        state.advance();
+
+        assertThat(state.hasIterationStarted(), is(true));
         assertThat(state.isExhausted(), is(true));
     }
 
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index 3d88962deed..ab1c2784ef8 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -49,6 +50,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
 class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest {
+    private static final int ZONE_ID = 0;
     private static final int TABLE_ID_1 = 1;
     private static final int TABLE_ID_2 = 2;
 
@@ -65,13 +67,18 @@ class OutgoingSnapshotCommonTest extends 
BaseIgniteAbstractTest {
 
     private final PartitionReplicationMessagesFactory messagesFactory = new 
PartitionReplicationMessagesFactory();
 
-    private final PartitionKey partitionKey = new ZonePartitionKey(0, 1);
+    private final PartitionKey partitionKey = new ZonePartitionKey(ZONE_ID, 1);
 
     private static final int REQUIRED_CATALOG_VERSION = 42;
 
     @BeforeEach
-    void createTestInstance() {
-        
lenient().when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
+    void createTestInstance(
+            @Mock Catalog catalog,
+            @Mock CatalogTableDescriptor tableDescriptor
+    ) {
+        lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
+        lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+        lenient().when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
 
         var partitionsByTableId = new 
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
 
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 72b05eb79b9..494ed37137e 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -27,16 +27,21 @@ import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
@@ -46,6 +51,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowImpl;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -53,6 +59,7 @@ import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -60,6 +67,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
 class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest {
+    private static final int ZONE_ID = 0;
+
     private static final BinaryRow ROW_1 = new BinaryRowImpl(0, 
ByteBuffer.wrap(new byte[]{1}));
     private static final BinaryRow ROW_2 = new BinaryRowImpl(0, 
ByteBuffer.wrap(new byte[]{2}));
 
@@ -74,6 +83,8 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
     @Mock
     private PartitionMvStorageAccess partitionAccess2;
 
+    private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId 
= new Int2ObjectOpenHashMap<>();
+
     @Mock
     private CatalogService catalogService;
 
@@ -93,17 +104,25 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
     private final UUID transactionId = UUID.randomUUID();
     private final int commitTableId = 999;
 
-    private final PartitionKey partitionKey = new ZonePartitionKey(0, 
PARTITION_ID);
+    private final PartitionKey partitionKey = new ZonePartitionKey(ZONE_ID, 
PARTITION_ID);
 
     @BeforeEach
-    void createTestInstance() {
-        lenient().when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
+    void createTestInstance(
+            @Mock Catalog catalog,
+            @Mock CatalogTableDescriptor tableDescriptor,
+            @Mock RaftGroupConfiguration raftGroupConfiguration
+    ) {
+        when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
         
lenient().when(partitionAccess1.partitionId()).thenReturn(PARTITION_ID);
+        
when(partitionAccess1.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
 
-        lenient().when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
+        when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
         
lenient().when(partitionAccess2.partitionId()).thenReturn(PARTITION_ID);
+        
lenient().when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
 
-        var partitionsByTableId = new 
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
+        when(catalogService.catalog(anyInt())).thenReturn(catalog);
+        when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+        when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
 
         partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
         partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
@@ -115,6 +134,14 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
                 mock(PartitionTxStateAccess.class),
                 catalogService
         );
+
+        snapshot.acquireMvLock();
+
+        try {
+            snapshot.freezeScopeUnderMvLock();
+        } finally {
+            snapshot.releaseMvLock();
+        }
     }
 
     @BeforeEach
@@ -128,6 +155,31 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         rowIdOutOfOrder = id;
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24517";)
+    @Test
+    void sendsEmptyResponseForEmptyMvData() {
+        snapshot = new OutgoingSnapshot(
+                UUID.randomUUID(),
+                partitionKey,
+                Int2ObjectMaps.emptyMap(),
+                mock(PartitionTxStateAccess.class),
+                catalogService
+        );
+
+        snapshot.acquireMvLock();
+
+        try {
+            snapshot.freezeScopeUnderMvLock();
+        } finally {
+            snapshot.releaseMvLock();
+        }
+
+        SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
+
+        assertThat(response.rows(), is(emptyList()));
+        assertThat(response.finish(), is(true));
+    }
+
     @Test
     void sendsCommittedAndUncommittedVersionsFromStorage() {
         ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, 
clock.now());
@@ -493,7 +545,7 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         snapshot.acquireMvLock();
 
         try {
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_1, lowestRowId));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
lowestRowId));
         } finally {
             snapshot.releaseMvLock();
         }
@@ -513,7 +565,7 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         snapshot.acquireMvLock();
 
         try {
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
         } finally {
             snapshot.releaseMvLock();
         }
@@ -533,7 +585,7 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         snapshot.acquireMvLock();
 
         try {
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
rowId2));
         } finally {
             snapshot.releaseMvLock();
         }
@@ -557,38 +609,38 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
         snapshot.acquireMvLock();
 
         try {
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
rowId2));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId2));
 
             getMvDataResponse(1);
 
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
rowId2));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId2));
 
             getMvDataResponse(1);
 
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId2));
 
             getMvDataResponse(1);
 
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
-            assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId1));
+            assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, 
rowId2));
 
             getMvDataResponse(1);
 
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId1));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId2));
         } finally {
             snapshot.releaseMvLock();
         }
@@ -606,7 +658,7 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
 
         try {
             //noinspection ConstantConditions
-            assertTrue(snapshot.alreadyPassed(TABLE_ID_1, 
rowId3.increment().increment().increment()));
+            assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, 
rowId3.increment().increment().increment()));
         } finally {
             snapshot.releaseMvLock();
         }
@@ -618,4 +670,45 @@ class OutgoingSnapshotMvDataStreamingTest extends 
BaseIgniteAbstractTest {
 
         assertThat(getNullableMvDataResponse(Long.MAX_VALUE), is(nullValue()));
     }
+
+    @Test
+    void doesNotReturnRowsFromNewTables(
+            @Mock PartitionMvStorageAccess partitionAccess3,
+            @Mock PartitionMvStorageAccess partitionAccess4
+    ) {
+        partitionsByTableId.put(Integer.MIN_VALUE, partitionAccess3);
+        partitionsByTableId.put(Integer.MAX_VALUE, partitionAccess4);
+
+        ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, 
clock.now());
+        ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, 
clock.now());
+
+        for (PartitionMvStorageAccess partitionAccess : 
List.of(partitionAccess1, partitionAccess2, partitionAccess3, 
partitionAccess4)) {
+            
lenient().when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
+            
lenient().when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1,
 version2));
+            
lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+            
lenient().when(partitionAccess.getAllRowVersions(rowId2)).thenReturn(List.of(version1));
+        }
+
+        snapshot.acquireMvLock();
+
+        try {
+            assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MIN_VALUE, 
rowId1), is(true));
+            assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MAX_VALUE, 
rowId1), is(true));
+        } finally {
+            snapshot.releaseMvLock();
+        }
+
+        SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
+
+        assertThat(response.rows(), hasSize(4));
+
+        snapshot.acquireMvLock();
+
+        try {
+            assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MIN_VALUE, 
rowId1), is(true));
+            assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MAX_VALUE, 
rowId1), is(true));
+        } finally {
+            snapshot.releaseMvLock();
+        }
+    }
 }
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index b55df3819d4..154ec245855 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.Executor;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
@@ -39,24 +41,38 @@ import 
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * For {@link OutgoingSnapshotReader} testing.
  */
+@ExtendWith(MockitoExtension.class)
 public class OutgoingSnapshotReaderTest extends BaseIgniteAbstractTest {
+    private static final int ZONE_ID = 0;
     private static final int TABLE_ID_1 = 1;
     private static final int TABLE_ID_2 = 2;
 
     @Test
-    void testForChoosingMaximumAppliedIndexForMeta() throws IOException {
-        PartitionMvStorageAccess partitionAccess1 = 
mock(PartitionMvStorageAccess.class);
-        PartitionMvStorageAccess partitionAccess2 = 
mock(PartitionMvStorageAccess.class);
+    void testForChoosingMaximumAppliedIndexForMeta(
+            @Mock CatalogService catalogService,
+            @Mock Catalog catalog,
+            @Mock CatalogTableDescriptor tableDescriptor,
+            @Mock RaftGroupConfiguration raftGroupConfiguration,
+            @Mock PartitionMvStorageAccess partitionAccess1,
+            @Mock PartitionMvStorageAccess partitionAccess2,
+            @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
+            @Mock PartitionTxStateAccess txStateAccess
+    ) throws IOException {
+        when(catalogService.catalog(anyInt())).thenReturn(catalog);
+        when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+        when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
 
         when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
         when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
-        
when(partitionAccess2.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
+        
when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
 
-        OutgoingSnapshotsManager outgoingSnapshotsManager = 
mock(OutgoingSnapshotsManager.class);
         doAnswer(invocation -> {
             OutgoingSnapshot snapshot = invocation.getArgument(1);
 
@@ -65,11 +81,6 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
             return null;
         }).when(outgoingSnapshotsManager).startOutgoingSnapshot(any(), any());
 
-        CatalogService catalogService = mock(CatalogService.class);
-        when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
-
-        PartitionTxStateAccess txStateAccess = 
mock(PartitionTxStateAccess.class);
-
         var partitionsByTableId = new 
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
 
         partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
@@ -93,7 +104,7 @@ public class OutgoingSnapshotReaderTest extends 
BaseIgniteAbstractTest {
         when(txStateAccess.lastAppliedIndex()).thenReturn(10L);
 
         when(txStateAccess.lastAppliedTerm()).thenReturn(1L);
-        when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
+        lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
         when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
 
         try (var reader = new OutgoingSnapshotReader(snapshotStorage)) {
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 6df959128f3..1dbb6fc6369 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -217,7 +217,7 @@ class SnapshotAwarePartitionDataStorageTest extends 
BaseIgniteAbstractTest {
     void notYetPassedRowIsEnqueued(MvWriteAction writeAction) {
         
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
 
-        doReturn(false).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+        doReturn(false).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID), 
any());
         doReturn(true).when(snapshot).addRowIdToSkip(any());
 
         writeAction.executeOn(testedStorage, rowId);
@@ -230,7 +230,7 @@ class SnapshotAwarePartitionDataStorageTest extends 
BaseIgniteAbstractTest {
     void notYetPassedRowNotEnqueuedSecondTime(MvWriteAction writeAction) {
         
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
 
-        doReturn(false).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+        doReturn(false).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID), 
any());
         doReturn(false).when(snapshot).addRowIdToSkip(any());
 
         writeAction.executeOn(testedStorage, rowId);
@@ -243,7 +243,7 @@ class SnapshotAwarePartitionDataStorageTest extends 
BaseIgniteAbstractTest {
     void alreadyPassedRowNotEnqueued(MvWriteAction writeAction) {
         
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
 
-        doReturn(true).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+        doReturn(true).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID), 
any());
 
         writeAction.executeOn(testedStorage, rowId);
 
@@ -263,7 +263,7 @@ class SnapshotAwarePartitionDataStorageTest extends 
BaseIgniteAbstractTest {
     }
 
     private static void 
configureSnapshotToLetEnqueueOutOfOrderMvRow(OutgoingSnapshot 
snapshotToConfigure) {
-        doReturn(false).when(snapshotToConfigure).alreadyPassed(eq(TABLE_ID), 
any());
+        
doReturn(false).when(snapshotToConfigure).alreadyPassedOrIrrelevant(eq(TABLE_ID),
 any());
         doReturn(true).when(snapshotToConfigure).addRowIdToSkip(any());
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 246b00e6424..c73c6bd897c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -175,7 +175,7 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
             snapshot.acquireMvLock();
 
             try {
-                if (snapshot.alreadyPassed(tableId, rowId)) {
+                if (snapshot.alreadyPassedOrIrrelevant(tableId, rowId)) {
                     // Row already sent.
                     continue;
                 }

Reply via email to