This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c45763d2c2a IGNITE-24522 Fix races on Raft snapshot creation and
adding a table to a zone. (#5242)
c45763d2c2a is described below
commit c45763d2c2ad32ff84d1dd6a85303093f8f585d5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Feb 18 15:00:49 2025 +0200
IGNITE-24522 Fix races on Raft snapshot creation and adding a table to a
zone. (#5242)
---
.../snapshot/PartitionSnapshotStorageFactory.java | 1 -
.../outgoing/MvPartitionDeliveryState.java | 45 ++++--
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 158 +++++++++++++++------
.../outgoing/MvPartitionDeliveryStateTest.java | 30 +++-
.../outgoing/OutgoingSnapshotCommonTest.java | 13 +-
.../OutgoingSnapshotMvDataStreamingTest.java | 151 ++++++++++++++++----
.../outgoing/OutgoingSnapshotReaderTest.java | 33 +++--
.../SnapshotAwarePartitionDataStorageTest.java | 8 +-
.../SnapshotAwarePartitionDataStorage.java | 2 +-
9 files changed, 328 insertions(+), 113 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
index 07a7119f447..0526a77ef31 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorageFactory.java
@@ -92,7 +92,6 @@ public class PartitionSnapshotStorageFactory implements
SnapshotStorageFactory {
* Adds a given table partition storage to the snapshot storage, managed
by this factory.
*/
public void addMvPartition(int tableId, PartitionMvStorageAccess
partition) {
- // FIXME: there are possible races with table creation, see
https://issues.apache.org/jira/browse/IGNITE-24522
PartitionMvStorageAccess prev = partitionsByTableId.put(tableId,
partition);
assert prev == null : "Partition storage for table ID " + tableId + "
already exists.";
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
index a13fc12ba07..a18052bee63 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryState.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
-import java.util.Collection;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.Iterator;
+import java.util.List;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import org.apache.ignite.internal.storage.RowId;
import org.jetbrains.annotations.Nullable;
@@ -29,22 +31,29 @@ import org.jetbrains.annotations.Nullable;
class MvPartitionDeliveryState {
private final Iterator<PartitionMvStorageAccess> partitionStoragesIterator;
- /**
- * Current row ID within the current partition storage. Can be {@code
null} only if the snapshot has delivered all possible data.
- */
+ /** Current row ID within the current partition storage. */
@Nullable
private RowId currentRowId;
- /**
- * Current partition storage. Can be {@code null} only if the snapshot has
delivered all possible data.
- */
+ /** Current partition storage. */
@Nullable
private PartitionMvStorageAccess currentPartitionStorage;
- MvPartitionDeliveryState(Collection<PartitionMvStorageAccess>
partitionStorages) {
+ private final IntSet tableIds;
+
+ private boolean isStarted = false;
+
+ /**
+ * Creates a new state. The state is initially positioned before the first
row of the first storage.
+ *
+ * @param partitionStorages Partition storages to iterate over. They
<b>must</b> be sorted by table ID in ascending order.
+ */
+ MvPartitionDeliveryState(List<PartitionMvStorageAccess> partitionStorages)
{
this.partitionStoragesIterator = partitionStorages.iterator();
- advance();
+ tableIds = new IntOpenHashSet(partitionStorages.size());
+
+ partitionStorages.forEach(storage -> tableIds.add(storage.tableId()));
}
RowId currentRowId() {
@@ -63,14 +72,30 @@ class MvPartitionDeliveryState {
return currentPartitionStorage().tableId();
}
+ /**
+ * Returns {@code true} if the given table ID is in the range of tables
that this state iterates over.
+ */
+ boolean isGoingToBeDelivered(int tableId) {
+ return tableIds.contains(tableId);
+ }
+
/**
* Returns {@code true} if all data for the snapshot has been retrieved.
*/
boolean isExhausted() {
- return currentPartitionStorage == null;
+ return currentPartitionStorage == null &&
!partitionStoragesIterator.hasNext();
+ }
+
+ /**
+ * Returns {@code true} if the {@link #advance()} method has been called
at least once.
+ */
+ boolean hasIterationStarted() {
+ return isStarted;
}
void advance() {
+ isStarted = true;
+
while (true) {
if (currentPartitionStorage == null) {
if (!partitionStoragesIterator.hasNext()) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index a87b83f4ef0..cea03d1bba5 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -18,15 +18,17 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing;
import static java.lang.Math.max;
+import static java.util.Comparator.comparingInt;
import static java.util.Comparator.comparingLong;
+import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.collectNextRowIdToBuildIndexes;
import static
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.SnapshotMetaUtils.snapshotMetaAt;
-import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -34,7 +36,9 @@ import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -52,6 +56,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDa
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKey;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
+import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -61,7 +66,6 @@ import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxMetaMessage;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.jetbrains.annotations.Nullable;
/**
@@ -83,15 +87,15 @@ public class OutgoingSnapshot {
private final PartitionKey partitionKey;
- private final Int2ObjectSortedMap<PartitionMvStorageAccess>
partitionsByTableId;
+ private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId;
private final PartitionTxStateAccess txState;
private final CatalogService catalogService;
/**
- * Lock that is used for mutual exclusion of MV snapshot reading (by this
class) and threads that write MV data to the same
- * partition (currently, via {@link PartitionDataStorage}).
+ * Lock that is used for mutual exclusion of MV snapshot reading (by this
class) and threads that write MV data to the same partition
+ * (currently, via {@link PartitionDataStorage}).
*/
private final ReentrantLock mvOperationsLock = new ReentrantLock();
@@ -100,30 +104,42 @@ public class OutgoingSnapshot {
private volatile PartitionSnapshotMeta frozenMeta;
/**
- * {@link RowId}s for which the corresponding rows were sent out of order
(relative to the order in which this
- * snapshot sends rows), hence they must be skipped when sending rows
normally.
+ * {@link RowId}s for which the corresponding rows were sent out of order
(relative to the order in which this snapshot sends rows),
+ * hence they must be skipped when sending rows normally.
+ *
+ * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
*/
- private final Set<RowId> rowIdsToSkip = new ConcurrentHashSet<>();
+ private final Set<RowId> rowIdsToSkip = new HashSet<>();
// TODO: IGNITE-18018 - manage queue size
/**
- * Rows that need to be sent out of order (relative to the order in which
this snapshot sends rows).
- * Versions inside rows are in oldest-to-newest order.
+ * Rows that need to be sent out of order (relative to the order in which
this snapshot sends rows). Versions inside rows are in
+ * oldest-to-newest order.
+ *
+ * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
*/
private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData
= new ArrayDeque<>();
/**
- * Current delivery state of MV partition data. Can be {@code null} only
if the delivery has not started yet.
+ * Current delivery state of MV partition data.
+ *
+ * <p>Multi-threaded access is guarded by {@link #mvOperationsLock}.
*/
@Nullable
private MvPartitionDeliveryState mvPartitionDeliveryState;
+ /**
+ * Cursor over TX data.
+ *
+ * <p>Inter-thread visibility is provided by accessing {@link
#finishedTxData} in a correct order.
+ */
+ @Nullable
private Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor;
/**
* This becomes {@code true} as soon as we exhaust TX data in the
partition.
*/
- private boolean finishedTxData = false;
+ private volatile boolean finishedTxData;
private volatile boolean closed = false;
@@ -139,8 +155,7 @@ public class OutgoingSnapshot {
) {
this.id = id;
this.partitionKey = partitionKey;
- // Create a sorted copy in order to process partitions in a
deterministic order.
- this.partitionsByTableId = new
Int2ObjectAVLTreeMap<>(partitionsByTableId);
+ this.partitionsByTableId = partitionsByTableId;
this.txState = txState;
this.catalogService = catalogService;
}
@@ -168,18 +183,27 @@ public class OutgoingSnapshot {
acquireMvLock();
try {
- frozenMeta = takeSnapshotMeta();
+ int catalogVersion = catalogService.latestCatalogVersion();
+
+ List<PartitionMvStorageAccess> partitionStorages =
freezePartitionStorages(catalogVersion);
+
+ frozenMeta = takeSnapshotMeta(catalogVersion, partitionStorages);
txDataCursor = txState.getAllTxMeta();
+
+ // Write the flag to publish the TX cursor (in the memory model
sense).
+ finishedTxData = false;
+
+ mvPartitionDeliveryState = new
MvPartitionDeliveryState(partitionStorages);
} finally {
releaseMvLock();
}
}
- private PartitionSnapshotMeta takeSnapshotMeta() {
+ private PartitionSnapshotMeta takeSnapshotMeta(int catalogVersion,
Collection<PartitionMvStorageAccess> partitionStorages) {
// TODO: partitionsByTableId will be empty for zones without tables,
need another way to get meta in that case,
// see https://issues.apache.org/jira/browse/IGNITE-24517
- PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex =
partitionsByTableId.values().stream()
+ PartitionMvStorageAccess partitionStorageWithMaxAppliedIndex =
partitionStorages.stream()
.max(comparingLong(PartitionMvStorageAccess::lastAppliedIndex))
.orElseThrow();
@@ -187,11 +211,9 @@ public class OutgoingSnapshot {
assert config != null : "Configuration should never be null when
installing a snapshot";
- int catalogVersion = catalogService.latestCatalogVersion();
-
Map<Integer, UUID> nextRowIdToBuildByIndexId =
collectNextRowIdToBuildIndexes(
catalogService,
- partitionsByTableId.values(),
+ partitionStorages,
catalogVersion
);
@@ -207,6 +229,29 @@ public class OutgoingSnapshot {
);
}
+ private List<PartitionMvStorageAccess> freezePartitionStorages(int
catalogVersion) {
+ Catalog catalog = catalogService.catalog(catalogVersion);
+
+ if (partitionKey instanceof ZonePartitionKey) {
+ int zoneId = ((ZonePartitionKey) partitionKey).zoneId();
+
+ return partitionsByTableId.values().stream()
+ .filter(storage -> {
+ CatalogTableDescriptor tableDescriptor =
catalog.table(storage.tableId());
+
+ return tableDescriptor != null &&
tableDescriptor.zoneId() == zoneId;
+ })
+ .sorted(comparingInt(PartitionMvStorageAccess::tableId))
+ .collect(toList());
+ } else {
+ // TODO: remove this clause, see
https://issues.apache.org/jira/browse/IGNITE-22522
+ // For a non-colocation case we always have a single entry in this
map.
+ assert partitionsByTableId.size() == 1;
+
+ return List.copyOf(partitionsByTableId.values());
+ }
+ }
+
/**
* Returns metadata corresponding to this snapshot.
*
@@ -257,8 +302,6 @@ public class OutgoingSnapshot {
return logThatAlreadyClosedAndReturnNull();
}
- assert !finishedMvData() : "MV data sending has already been finished";
-
long totalBatchSize = 0;
List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
@@ -273,17 +316,15 @@ public class OutgoingSnapshot {
// As out-of-order rows are added under the same lock that we
hold, and we always send OOO data first,
// exhausting the partition means that no MV data to send is
left, we are finished with it.
if (finishedMvData() || batchIsFull(request, totalBatchSize)) {
- break;
+ return
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
+ .rows(batch)
+ .finish(finishedMvData())
+ .build();
}
} finally {
releaseMvLock();
}
}
-
- return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMvDataResponse()
- .rows(batch)
- .finish(finishedMvData())
- .build();
}
private long fillWithOutOfOrderRows(
@@ -331,11 +372,9 @@ public class OutgoingSnapshot {
return totalBatchSize;
}
- if (mvPartitionDeliveryState == null) {
- mvPartitionDeliveryState = new
MvPartitionDeliveryState(partitionsByTableId.values());
- } else {
- mvPartitionDeliveryState.advance();
- }
+ assert mvPartitionDeliveryState != null : "Snapshot scope has not been
frozen.";
+
+ mvPartitionDeliveryState.advance();
if (!finishedMvData()) {
RowId rowId = mvPartitionDeliveryState.currentRowId();
@@ -426,6 +465,12 @@ public class OutgoingSnapshot {
List<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<>();
+ boolean finishedTxData = this.finishedTxData;
+
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txDataCursor = this.txDataCursor;
+
+ assert txDataCursor != null : "Snapshot scope has not been frozen.";
+
while (!finishedTxData && rows.size() <
request.maxTransactionsInBatch()) {
if (txDataCursor.hasNext()) {
rows.add(txDataCursor.next());
@@ -435,6 +480,8 @@ public class OutgoingSnapshot {
}
}
+ this.finishedTxData = finishedTxData;
+
return buildTxDataResponse(rows, finishedTxData);
}
@@ -478,13 +525,14 @@ public class OutgoingSnapshot {
}
/**
- * Whether this snapshot is finished with sending MV data (i.e. it already
sent all the MV data and is not going to send anything else).
- *
- * <p>Must be called under snapshot lock.
+ * Returns {@code true} if this snapshot is finished with sending MV data
(i.e. it already sent all MV data and is not going to send
+ * anything else).
*
- * @return {@code true} if finished.
+ * <p>Must be called under {@link #mvOperationsLock}.
*/
private boolean finishedMvData() {
+ assert mvOperationsLock.isLocked() : "MV operations lock must be
acquired!";
+
return mvPartitionDeliveryState != null &&
mvPartitionDeliveryState.isExhausted();
}
@@ -503,26 +551,39 @@ public class OutgoingSnapshot {
}
/**
- * Returns {@code true} if the given {@link RowId} does not interfere with
the rows that this snapshot is going
- * to send in the normal snapshot rows sending order.
+ * Returns {@code true} if the given {@link RowId} does not interfere with
the rows that this snapshot is going to send in the normal
+ * snapshot rows sending order.
*
* <p>Must be called under MV data snapshot lock.
*
* @param rowId RowId.
- * @return {@code true} if the given RowId is already passed by the
snapshot in normal rows sending order.
+ * @return {@code true} if the given RowId is already passed by the
snapshot in normal rows sending order or if the RowId belongs to
+ * a table that is not a part of this snapshot at all.
*/
- public boolean alreadyPassed(int tableId, RowId rowId) {
+ public boolean alreadyPassedOrIrrelevant(int tableId, RowId rowId) {
assert mvOperationsLock.isLocked() : "MV operations lock must be
acquired!";
if (mvPartitionDeliveryState == null) {
- // We haven't started sending MV data yet.
+ // We haven't even frozen the snapshot scope yet.
return false;
}
- if (finishedMvData()) {
+ return !mvPartitionDeliveryState.isGoingToBeDelivered(tableId) ||
alreadyPassed(tableId, rowId);
+ }
+
+ private boolean alreadyPassed(int tableId, RowId rowId) {
+ assert mvPartitionDeliveryState != null;
+
+ if (mvPartitionDeliveryState.isExhausted()) {
+ // We have already finished delivering all data.
return true;
}
+ if (!mvPartitionDeliveryState.hasIterationStarted()) {
+ // We haven't started streaming the data yet.
+ return false;
+ }
+
if (tableId == mvPartitionDeliveryState.currentTableId()) {
// 'currentRowId' here has already been sent, hence the non-strict
comparison.
return rowId.compareTo(mvPartitionDeliveryState.currentRowId()) <=
0;
@@ -552,10 +613,13 @@ public class OutgoingSnapshot {
* Closes the snapshot releasing the underlying resources.
*/
public void close() {
- Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
+ if (!finishedTxData) {
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
- if (txCursor != null) {
- closeLoggingProblems(txCursor);
+ if (txCursor != null) {
+ closeLoggingProblems(txCursor);
+ finishedTxData = true;
+ }
}
closed = true;
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
index 3cae52762c4..6ec8d503127 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/MvPartitionDeliveryStateTest.java
@@ -42,6 +42,7 @@ class MvPartitionDeliveryStateTest extends
BaseIgniteAbstractTest {
void emptyStateIsExhausted() {
var state = new MvPartitionDeliveryState(List.of());
+ assertThat(state.hasIterationStarted(), is(false));
assertThat(state.isExhausted(), is(true));
}
@@ -74,29 +75,35 @@ class MvPartitionDeliveryStateTest extends
BaseIgniteAbstractTest {
var state = new MvPartitionDeliveryState(storages);
+ assertThat(state.hasIterationStarted(), is(false));
+ assertThat(state.isExhausted(), is(false));
+
for (int i = 0; i < storages.size(); i++) {
- assertThat(state.isExhausted(), is(false));
+ state.advance();
+ assertThat(state.hasIterationStarted(), is(true));
+ assertThat(state.isExhausted(), is(false));
assertThat(state.currentTableId(), is(i));
assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(0)));
state.advance();
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(false));
-
assertThat(state.currentTableId(), is(i));
assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(1)));
state.advance();
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(false));
-
assertThat(state.currentTableId(), is(i));
assertThat(state.currentRowId(), is(rowIdsByTable.get(i).get(2)));
-
- state.advance();
}
+ state.advance();
+
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(true));
}
@@ -122,20 +129,29 @@ class MvPartitionDeliveryStateTest extends
BaseIgniteAbstractTest {
var state = new MvPartitionDeliveryState(List.of(storage1, storage2));
+ assertThat(state.hasIterationStarted(), is(false));
+ assertThat(state.isExhausted(), is(false));
+
IntStream.rangeClosed(1, 2).forEach(i -> {
+ state.advance();
+
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(false));
+
assertThat(state.currentTableId(), is(i));
assertThat(state.currentRowId(), is(lowestRowId));
state.advance();
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(false));
assertThat(state.currentTableId(), is(i));
assertThat(state.currentRowId(), is(highestRowId));
-
- state.advance();
});
+ state.advance();
+
+ assertThat(state.hasIterationStarted(), is(true));
assertThat(state.isExhausted(), is(true));
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
index 3d88962deed..ab1c2784ef8 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotCommonTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaRequest;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -49,6 +50,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class OutgoingSnapshotCommonTest extends BaseIgniteAbstractTest {
+ private static final int ZONE_ID = 0;
private static final int TABLE_ID_1 = 1;
private static final int TABLE_ID_2 = 2;
@@ -65,13 +67,18 @@ class OutgoingSnapshotCommonTest extends
BaseIgniteAbstractTest {
private final PartitionReplicationMessagesFactory messagesFactory = new
PartitionReplicationMessagesFactory();
- private final PartitionKey partitionKey = new ZonePartitionKey(0, 1);
+ private final PartitionKey partitionKey = new ZonePartitionKey(ZONE_ID, 1);
private static final int REQUIRED_CATALOG_VERSION = 42;
@BeforeEach
- void createTestInstance() {
-
lenient().when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
+ void createTestInstance(
+ @Mock Catalog catalog,
+ @Mock CatalogTableDescriptor tableDescriptor
+ ) {
+ lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog);
+ lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+ lenient().when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
var partitionsByTableId = new
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 72b05eb79b9..494ed37137e 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -27,16 +27,21 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
@@ -46,6 +51,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionKe
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionTxStateAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.ZonePartitionKey;
+import org.apache.ignite.internal.raft.RaftGroupConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.storage.ReadResult;
@@ -53,6 +59,7 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -60,6 +67,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class OutgoingSnapshotMvDataStreamingTest extends BaseIgniteAbstractTest {
+ private static final int ZONE_ID = 0;
+
private static final BinaryRow ROW_1 = new BinaryRowImpl(0,
ByteBuffer.wrap(new byte[]{1}));
private static final BinaryRow ROW_2 = new BinaryRowImpl(0,
ByteBuffer.wrap(new byte[]{2}));
@@ -74,6 +83,8 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
@Mock
private PartitionMvStorageAccess partitionAccess2;
+ private final Int2ObjectMap<PartitionMvStorageAccess> partitionsByTableId
= new Int2ObjectOpenHashMap<>();
+
@Mock
private CatalogService catalogService;
@@ -93,17 +104,25 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
private final UUID transactionId = UUID.randomUUID();
private final int commitTableId = 999;
- private final PartitionKey partitionKey = new ZonePartitionKey(0,
PARTITION_ID);
+ private final PartitionKey partitionKey = new ZonePartitionKey(ZONE_ID,
PARTITION_ID);
@BeforeEach
- void createTestInstance() {
- lenient().when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
+ void createTestInstance(
+ @Mock Catalog catalog,
+ @Mock CatalogTableDescriptor tableDescriptor,
+ @Mock RaftGroupConfiguration raftGroupConfiguration
+ ) {
+ when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
lenient().when(partitionAccess1.partitionId()).thenReturn(PARTITION_ID);
+
when(partitionAccess1.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
- lenient().when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
+ when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
lenient().when(partitionAccess2.partitionId()).thenReturn(PARTITION_ID);
+
lenient().when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
- var partitionsByTableId = new
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
+ when(catalogService.catalog(anyInt())).thenReturn(catalog);
+ when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+ when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
partitionsByTableId.put(TABLE_ID_2, partitionAccess2);
@@ -115,6 +134,14 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
mock(PartitionTxStateAccess.class),
catalogService
);
+
+ snapshot.acquireMvLock();
+
+ try {
+ snapshot.freezeScopeUnderMvLock();
+ } finally {
+ snapshot.releaseMvLock();
+ }
}
@BeforeEach
@@ -128,6 +155,31 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
rowIdOutOfOrder = id;
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-24517")
+ @Test
+ void sendsEmptyResponseForEmptyMvData() {
+ snapshot = new OutgoingSnapshot(
+ UUID.randomUUID(),
+ partitionKey,
+ Int2ObjectMaps.emptyMap(),
+ mock(PartitionTxStateAccess.class),
+ catalogService
+ );
+
+ snapshot.acquireMvLock();
+
+ try {
+ snapshot.freezeScopeUnderMvLock();
+ } finally {
+ snapshot.releaseMvLock();
+ }
+
+ SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
+
+ assertThat(response.rows(), is(emptyList()));
+ assertThat(response.finish(), is(true));
+ }
+
@Test
void sendsCommittedAndUncommittedVersionsFromStorage() {
ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1,
clock.now());
@@ -493,7 +545,7 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
snapshot.acquireMvLock();
try {
- assertFalse(snapshot.alreadyPassed(TABLE_ID_1, lowestRowId));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
lowestRowId));
} finally {
snapshot.releaseMvLock();
}
@@ -513,7 +565,7 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
snapshot.acquireMvLock();
try {
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
} finally {
snapshot.releaseMvLock();
}
@@ -533,7 +585,7 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
snapshot.acquireMvLock();
try {
- assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
rowId2));
} finally {
snapshot.releaseMvLock();
}
@@ -557,38 +609,38 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
snapshot.acquireMvLock();
try {
- assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
rowId2));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId2));
getMvDataResponse(1);
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
rowId2));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId2));
getMvDataResponse(1);
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId2));
getMvDataResponse(1);
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
- assertFalse(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId1));
+ assertFalse(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2,
rowId2));
getMvDataResponse(1);
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId1));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1, rowId2));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId1));
- assertTrue(snapshot.alreadyPassed(TABLE_ID_2, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId1));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1, rowId2));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId1));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_2, rowId2));
} finally {
snapshot.releaseMvLock();
}
@@ -606,7 +658,7 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
try {
//noinspection ConstantConditions
- assertTrue(snapshot.alreadyPassed(TABLE_ID_1,
rowId3.increment().increment().increment()));
+ assertTrue(snapshot.alreadyPassedOrIrrelevant(TABLE_ID_1,
rowId3.increment().increment().increment()));
} finally {
snapshot.releaseMvLock();
}
@@ -618,4 +670,45 @@ class OutgoingSnapshotMvDataStreamingTest extends
BaseIgniteAbstractTest {
assertThat(getNullableMvDataResponse(Long.MAX_VALUE), is(nullValue()));
}
+
+ @Test
+ void doesNotReturnRowsFromNewTables(
+ @Mock PartitionMvStorageAccess partitionAccess3,
+ @Mock PartitionMvStorageAccess partitionAccess4
+ ) {
+ partitionsByTableId.put(Integer.MIN_VALUE, partitionAccess3);
+ partitionsByTableId.put(Integer.MAX_VALUE, partitionAccess4);
+
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1,
clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2,
clock.now());
+
+ for (PartitionMvStorageAccess partitionAccess :
List.of(partitionAccess1, partitionAccess2, partitionAccess3,
partitionAccess4)) {
+
lenient().when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
+
lenient().when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1,
version2));
+
lenient().when(partitionAccess.closestRowId(rowId2)).thenReturn(rowId2);
+
lenient().when(partitionAccess.getAllRowVersions(rowId2)).thenReturn(List.of(version1));
+ }
+
+ snapshot.acquireMvLock();
+
+ try {
+ assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MIN_VALUE,
rowId1), is(true));
+ assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MAX_VALUE,
rowId1), is(true));
+ } finally {
+ snapshot.releaseMvLock();
+ }
+
+ SnapshotMvDataResponse response = getMvDataResponse(Long.MAX_VALUE);
+
+ assertThat(response.rows(), hasSize(4));
+
+ snapshot.acquireMvLock();
+
+ try {
+ assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MIN_VALUE,
rowId1), is(true));
+ assertThat(snapshot.alreadyPassedOrIrrelevant(Integer.MAX_VALUE,
rowId1), is(true));
+ } finally {
+ snapshot.releaseMvLock();
+ }
+ }
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
index b55df3819d4..154ec245855 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshotReaderTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionMvStorageAccess;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionSnapshotStorage;
@@ -39,24 +41,38 @@ import
org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
/**
* For {@link OutgoingSnapshotReader} testing.
*/
+@ExtendWith(MockitoExtension.class)
public class OutgoingSnapshotReaderTest extends BaseIgniteAbstractTest {
+ private static final int ZONE_ID = 0;
private static final int TABLE_ID_1 = 1;
private static final int TABLE_ID_2 = 2;
@Test
- void testForChoosingMaximumAppliedIndexForMeta() throws IOException {
- PartitionMvStorageAccess partitionAccess1 =
mock(PartitionMvStorageAccess.class);
- PartitionMvStorageAccess partitionAccess2 =
mock(PartitionMvStorageAccess.class);
+ void testForChoosingMaximumAppliedIndexForMeta(
+ @Mock CatalogService catalogService,
+ @Mock Catalog catalog,
+ @Mock CatalogTableDescriptor tableDescriptor,
+ @Mock RaftGroupConfiguration raftGroupConfiguration,
+ @Mock PartitionMvStorageAccess partitionAccess1,
+ @Mock PartitionMvStorageAccess partitionAccess2,
+ @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
+ @Mock PartitionTxStateAccess txStateAccess
+ ) throws IOException {
+ when(catalogService.catalog(anyInt())).thenReturn(catalog);
+ when(catalog.table(anyInt())).thenReturn(tableDescriptor);
+ when(tableDescriptor.zoneId()).thenReturn(ZONE_ID);
when(partitionAccess1.tableId()).thenReturn(TABLE_ID_1);
when(partitionAccess2.tableId()).thenReturn(TABLE_ID_2);
-
when(partitionAccess2.committedGroupConfiguration()).thenReturn(mock(RaftGroupConfiguration.class));
+
when(partitionAccess2.committedGroupConfiguration()).thenReturn(raftGroupConfiguration);
- OutgoingSnapshotsManager outgoingSnapshotsManager =
mock(OutgoingSnapshotsManager.class);
doAnswer(invocation -> {
OutgoingSnapshot snapshot = invocation.getArgument(1);
@@ -65,11 +81,6 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
return null;
}).when(outgoingSnapshotsManager).startOutgoingSnapshot(any(), any());
- CatalogService catalogService = mock(CatalogService.class);
- when(catalogService.catalog(anyInt())).thenReturn(mock(Catalog.class));
-
- PartitionTxStateAccess txStateAccess =
mock(PartitionTxStateAccess.class);
-
var partitionsByTableId = new
Int2ObjectOpenHashMap<PartitionMvStorageAccess>();
partitionsByTableId.put(TABLE_ID_1, partitionAccess1);
@@ -93,7 +104,7 @@ public class OutgoingSnapshotReaderTest extends
BaseIgniteAbstractTest {
when(txStateAccess.lastAppliedIndex()).thenReturn(10L);
when(txStateAccess.lastAppliedTerm()).thenReturn(1L);
- when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
+ lenient().when(partitionAccess1.lastAppliedTerm()).thenReturn(2L);
when(partitionAccess2.lastAppliedTerm()).thenReturn(3L);
try (var reader = new OutgoingSnapshotReader(snapshotStorage)) {
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 6df959128f3..1dbb6fc6369 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -217,7 +217,7 @@ class SnapshotAwarePartitionDataStorageTest extends
BaseIgniteAbstractTest {
void notYetPassedRowIsEnqueued(MvWriteAction writeAction) {
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
- doReturn(false).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+ doReturn(false).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID),
any());
doReturn(true).when(snapshot).addRowIdToSkip(any());
writeAction.executeOn(testedStorage, rowId);
@@ -230,7 +230,7 @@ class SnapshotAwarePartitionDataStorageTest extends
BaseIgniteAbstractTest {
void notYetPassedRowNotEnqueuedSecondTime(MvWriteAction writeAction) {
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
- doReturn(false).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+ doReturn(false).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID),
any());
doReturn(false).when(snapshot).addRowIdToSkip(any());
writeAction.executeOn(testedStorage, rowId);
@@ -243,7 +243,7 @@ class SnapshotAwarePartitionDataStorageTest extends
BaseIgniteAbstractTest {
void alreadyPassedRowNotEnqueued(MvWriteAction writeAction) {
when(partitionSnapshots.ongoingSnapshots()).thenReturn(List.of(snapshot));
- doReturn(true).when(snapshot).alreadyPassed(eq(TABLE_ID), any());
+ doReturn(true).when(snapshot).alreadyPassedOrIrrelevant(eq(TABLE_ID),
any());
writeAction.executeOn(testedStorage, rowId);
@@ -263,7 +263,7 @@ class SnapshotAwarePartitionDataStorageTest extends
BaseIgniteAbstractTest {
}
private static void
configureSnapshotToLetEnqueueOutOfOrderMvRow(OutgoingSnapshot
snapshotToConfigure) {
- doReturn(false).when(snapshotToConfigure).alreadyPassed(eq(TABLE_ID),
any());
+
doReturn(false).when(snapshotToConfigure).alreadyPassedOrIrrelevant(eq(TABLE_ID),
any());
doReturn(true).when(snapshotToConfigure).addRowIdToSkip(any());
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 246b00e6424..c73c6bd897c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -175,7 +175,7 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
snapshot.acquireMvLock();
try {
- if (snapshot.alreadyPassed(tableId, rowId)) {
+ if (snapshot.alreadyPassedOrIrrelevant(tableId, rowId)) {
// Row already sent.
continue;
}