This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a44b6797fa IGNITE-18430 Add integration test for a case when RAFT
snapshot installation is cancelled in the middle (#1622)
a44b6797fa is described below
commit a44b6797fa552866095dd50b48957c82470308d7
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Feb 3 12:43:56 2023 +0300
IGNITE-18430 Add integration test for a case when RAFT snapshot
installation is cancelled in the middle (#1622)
---
.../snapshot/incoming/IncomingSnapshotCopier.java | 3 +-
.../incoming/IncomingSnapshotCopierTest.java | 250 ++++++++++++++++-----
2 files changed, 200 insertions(+), 53 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 3e05c7f320..824fb9c322 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
@@ -345,7 +346,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
setError(RaftError.UNKNOWN, throwable.getMessage());
}
- return partitionSnapshotStorage.partition().abortRebalance();
+ return
partitionSnapshotStorage.partition().abortRebalance().thenCompose(unused ->
failedFuture(throwable));
}
SnapshotMeta meta = snapshotMeta;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 7e6cd7c60d..75daee6e8f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -20,6 +20,8 @@ package
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
@@ -29,8 +31,11 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -58,11 +63,13 @@ import
org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.TableRow;
import org.apache.ignite.internal.schema.TableRowBuilder;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
@@ -78,6 +85,7 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -138,21 +146,15 @@ public class IncomingSnapshotCopierTest {
long expLastAppliedIndex = 100500L;
long expLastAppliedTerm = 100L;
- RaftGroupConfiguration expLastGroupConfig = new RaftGroupConfiguration(
- List.of("peer"),
- List.of("learner"),
- List.of("old-peer"),
- List.of("old-learner")
- );
+ RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
- List<RowId> rowIds =
fillMvPartitionStorage(outgoingMvPartitionStorage);
- List<UUID> txIds =
fillTxStatePartitionStorage(outgoingTxStatePartitionStorage);
+ List<RowId> rowIds = generateRowIds();
+ List<UUID> txIds = generateTxIds();
- outgoingMvPartitionStorage.lastApplied(expLastAppliedIndex,
expLastAppliedTerm);
-
outgoingMvPartitionStorage.committedGroupConfiguration(expLastGroupConfig);
- outgoingTxStatePartitionStorage.lastApplied(expLastAppliedIndex,
expLastAppliedTerm);
+ fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+ fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
- MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig));
+ MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(getTableConfig(), tablesConfig));
TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
@@ -263,66 +265,62 @@ public class IncomingSnapshotCopierTest {
outgoingSnapshotsManager,
SnapshotUri.toStringUri(snapshotId, NODE_NAME),
mock(RaftOptions.class),
- new PartitionAccessImpl(
+ spy(new PartitionAccessImpl(
new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
incomingTableStorage,
incomingTxStateTableStorage,
List::of
- ),
+ )),
mock(SnapshotMeta.class),
executorService
);
}
- private static List<RowId> fillMvPartitionStorage(MvPartitionStorage
storage) {
- List<RowId> rowIds = List.of(
- new RowId(TEST_PARTITION),
- new RowId(TEST_PARTITION),
- new RowId(TEST_PARTITION),
- new RowId(TEST_PARTITION)
- );
+ private static void fillMvPartitionStorage(
+ MvPartitionStorage storage,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ RaftGroupConfiguration raftGroupConfig,
+ List<RowId> rowIds
+ ) {
+ assertEquals(0, rowIds.size() % 2, "size=" + rowIds.size());
storage.runConsistently(() -> {
- // Writes committed version.
- storage.addWriteCommitted(rowIds.get(0), createRow("k0", "v0"),
HYBRID_CLOCK.now());
- storage.addWriteCommitted(rowIds.get(1), createRow("k1", "v1"),
HYBRID_CLOCK.now());
-
- storage.addWriteCommitted(rowIds.get(2), createRow("k20", "v20"),
HYBRID_CLOCK.now());
- storage.addWriteCommitted(rowIds.get(2), createRow("k21", "v21"),
HYBRID_CLOCK.now());
-
- // Writes an intent to write (uncommitted version).
- storage.addWrite(rowIds.get(2), createRow("k22", "v22"),
UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
-
- storage.addWrite(
- rowIds.get(3),
- createRow("k3", "v3"),
- UUID.randomUUID(),
- UUID.randomUUID(),
- TEST_PARTITION
- );
+ for (int i = 0; i < rowIds.size(); i++) {
+ if (i % 2 == 0) {
+ // Writes committed version.
+ storage.addWriteCommitted(rowIds.get(i), createRow("k" +
i, "v" + i), HYBRID_CLOCK.now());
+ } else {
+ // Writes an intent to write (uncommitted version).
+ storage.addWrite(rowIds.get(i), createRow("k" + i, "v" +
i), UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
+ }
+ }
+
+ storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+
+ storage.committedGroupConfiguration(raftGroupConfig);
return null;
});
-
- return rowIds;
}
- private static List<UUID> fillTxStatePartitionStorage(TxStateStorage
storage) {
- List<UUID> txIds = List.of(
- UUID.randomUUID(),
- UUID.randomUUID(),
- UUID.randomUUID(),
- UUID.randomUUID()
- );
+ private static void fillTxStatePartitionStorage(
+ TxStateStorage storage,
+ long lastAppliedIndex,
+ long lastAppliedTerm,
+ List<UUID> txIds
+ ) {
+ assertEquals(0, txIds.size() % 2, "size=" + txIds.size());
UUID tableId = UUID.randomUUID();
- storage.put(txIds.get(0), new TxMeta(COMMITED, List.of(new
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
- storage.put(txIds.get(1), new TxMeta(COMMITED, List.of(new
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
- storage.put(txIds.get(2), new TxMeta(ABORTED, List.of(new
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
- storage.put(txIds.get(3), new TxMeta(ABORTED, List.of(new
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+ for (int i = 0; i < txIds.size(); i++) {
+ TxState txState = i % 2 == 0 ? COMMITED : ABORTED;
- return txIds;
+ storage.put(txIds.get(i), new TxMeta(txState, List.of(new
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+ }
+
+ storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
}
private static List<ResponseEntry>
createSnapshotMvDataEntries(MvPartitionStorage storage, List<RowId> rowIds) {
@@ -446,5 +444,153 @@ public class IncomingSnapshotCopierTest {
});
assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
+
+ verify(partitionSnapshotStorage.partition()).abortRebalance();
+ }
+
+ @Test
+ void testCancelOnMiddleRebalance() {
+ MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
+ TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
+
+ long expLastAppliedIndex = 100500L;
+ long expLastAppliedTerm = 100L;
+ RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+
+ List<RowId> rowIds = generateRowIds();
+ List<UUID> txIds = generateTxIds();
+
+ fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+ fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
+
+ MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(getTableConfig(), tablesConfig));
+ TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
+
+ incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+ incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+ MessagingService messagingService =
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
+ outgoingTxStatePartitionStorage, expLastAppliedIndex,
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+
+ PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
+ snapshotId,
+ incomingMvTableStorage,
+ incomingTxStateTableStorage,
+ messagingService
+ );
+
+ // Let's add a rebalance interruption in the middle.
+ CompletableFuture<Void> startAddWriteFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> finishAddWriteFuture = new
CompletableFuture<>();
+
+ doAnswer(answer -> {
+ startAddWriteFuture.complete(null);
+
+ assertThat(finishAddWriteFuture, willCompleteSuccessfully());
+
+ return null;
+ }).when(partitionSnapshotStorage.partition())
+ .addWrite(any(RowId.class), any(TableRow.class),
any(UUID.class), any(UUID.class), anyInt());
+
+ // Let's start rebalancing.
+ SnapshotCopier snapshotCopier =
partitionSnapshotStorage.startToCopyFrom(
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(SnapshotCopierOptions.class)
+ );
+
+ // Let's try to cancel it in the middle of the rebalance.
+ CompletableFuture<?> cancelRebalanceFuture = runAsync(() -> {
+ assertThat(startAddWriteFuture, willCompleteSuccessfully());
+
+ CompletableFuture<?> cancelCopierFuture = runAsync(() ->
finishAddWriteFuture.complete(null));
+
+ snapshotCopier.cancel();
+
+ snapshotCopier.join();
+
+ assertThat(cancelCopierFuture, willCompleteSuccessfully());
+ });
+
+ assertThat(cancelRebalanceFuture, willCompleteSuccessfully());
+
+ verify(partitionSnapshotStorage.partition()).abortRebalance();
+ }
+
+ @Test
+ void testErrorInProcessOfRebalance() {
+ MvPartitionStorage outgoingMvPartitionStorage = new
TestMvPartitionStorage(TEST_PARTITION);
+ TxStateStorage outgoingTxStatePartitionStorage = new
TestTxStateStorage();
+
+ long expLastAppliedIndex = 100500L;
+ long expLastAppliedTerm = 100L;
+ RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+
+ List<RowId> rowIds = generateRowIds();
+ List<UUID> txIds = generateTxIds();
+
+ fillMvPartitionStorage(outgoingMvPartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+ fillTxStatePartitionStorage(outgoingTxStatePartitionStorage,
expLastAppliedIndex, expLastAppliedTerm, txIds);
+
+ MvTableStorage incomingMvTableStorage = spy(new
TestMvTableStorage(getTableConfig(), tablesConfig));
+ TxStateTableStorage incomingTxStateTableStorage = spy(new
TestTxStateTableStorage());
+
+ incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+ incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+ MessagingService messagingService =
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
+ outgoingTxStatePartitionStorage, expLastAppliedIndex,
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+
+ PartitionSnapshotStorage partitionSnapshotStorage =
createPartitionSnapshotStorage(
+ snapshotId,
+ incomingMvTableStorage,
+ incomingTxStateTableStorage,
+ messagingService
+ );
+
+ // Let's add an error on the rebalance.
+
doThrow(StorageException.class).when(partitionSnapshotStorage.partition())
+ .addWrite(any(RowId.class), any(TableRow.class),
any(UUID.class), any(UUID.class), anyInt());
+
+ // Let's start rebalancing.
+ SnapshotCopier snapshotCopier =
partitionSnapshotStorage.startToCopyFrom(
+ SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+ mock(SnapshotCopierOptions.class)
+ );
+
+ // Let's wait for an error on rebalancing.
+ assertThat(runAsync(snapshotCopier::join),
willFailFast(IllegalStateException.class));
+
+ verify(partitionSnapshotStorage.partition()).abortRebalance();
+ }
+
+ private TableConfiguration getTableConfig() {
+ return tablesConfig.tables().get("foo");
+ }
+
+ private static List<RowId> generateRowIds() {
+ return List.of(
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION),
+ new RowId(TEST_PARTITION)
+ );
+ }
+
+ private static List<UUID> generateTxIds() {
+ return List.of(
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ UUID.randomUUID()
+ );
+ }
+
+ private static RaftGroupConfiguration generateRaftGroupConfig() {
+ return new RaftGroupConfiguration(
+ List.of("peer"),
+ List.of("learner"),
+ List.of("old-peer"),
+ List.of("old-learner")
+ );
}
}