This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a44b6797fa IGNITE-18430 Add integration test for a case when RAFT 
snapshot installation is cancelled in the middle (#1622)
a44b6797fa is described below

commit a44b6797fa552866095dd50b48957c82470308d7
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Feb 3 12:43:56 2023 +0300

    IGNITE-18430 Add integration test for a case when RAFT snapshot 
installation is cancelled in the middle (#1622)
---
 .../snapshot/incoming/IncomingSnapshotCopier.java  |   3 +-
 .../incoming/IncomingSnapshotCopierTest.java       | 250 ++++++++++++++++-----
 2 files changed, 200 insertions(+), 53 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 3e05c7f320..824fb9c322 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CancellationException;
@@ -345,7 +346,7 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
                     setError(RaftError.UNKNOWN, throwable.getMessage());
                 }
 
-                return partitionSnapshotStorage.partition().abortRebalance();
+                return 
partitionSnapshotStorage.partition().abortRebalance().thenCompose(unused -> 
failedFuture(throwable));
             }
 
             SnapshotMeta meta = snapshotMeta;
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 7e6cd7c60d..75daee6e8f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -20,6 +20,8 @@ package 
org.apache.ignite.internal.table.distributed.raft.snapshot.incoming;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITED;
@@ -29,8 +31,11 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -58,11 +63,13 @@ import 
org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.schema.TableRowBuilder;
+import org.apache.ignite.internal.schema.configuration.TableConfiguration;
 import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
@@ -78,6 +85,7 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage;
@@ -138,21 +146,15 @@ public class IncomingSnapshotCopierTest {
 
         long expLastAppliedIndex = 100500L;
         long expLastAppliedTerm = 100L;
-        RaftGroupConfiguration expLastGroupConfig = new RaftGroupConfiguration(
-                List.of("peer"),
-                List.of("learner"),
-                List.of("old-peer"),
-                List.of("old-learner")
-        );
+        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
 
-        List<RowId> rowIds = 
fillMvPartitionStorage(outgoingMvPartitionStorage);
-        List<UUID> txIds = 
fillTxStatePartitionStorage(outgoingTxStatePartitionStorage);
+        List<RowId> rowIds = generateRowIds();
+        List<UUID> txIds = generateTxIds();
 
-        outgoingMvPartitionStorage.lastApplied(expLastAppliedIndex, 
expLastAppliedTerm);
-        
outgoingMvPartitionStorage.committedGroupConfiguration(expLastGroupConfig);
-        outgoingTxStatePartitionStorage.lastApplied(expLastAppliedIndex, 
expLastAppliedTerm);
+        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
 
-        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(tablesConfig.tables().get("foo"), tablesConfig));
+        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(getTableConfig(), tablesConfig));
         TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
 
         incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
@@ -263,66 +265,62 @@ public class IncomingSnapshotCopierTest {
                 outgoingSnapshotsManager,
                 SnapshotUri.toStringUri(snapshotId, NODE_NAME),
                 mock(RaftOptions.class),
-                new PartitionAccessImpl(
+                spy(new PartitionAccessImpl(
                         new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
                         incomingTableStorage,
                         incomingTxStateTableStorage,
                         List::of
-                ),
+                )),
                 mock(SnapshotMeta.class),
                 executorService
         );
     }
 
-    private static List<RowId> fillMvPartitionStorage(MvPartitionStorage 
storage) {
-        List<RowId> rowIds = List.of(
-                new RowId(TEST_PARTITION),
-                new RowId(TEST_PARTITION),
-                new RowId(TEST_PARTITION),
-                new RowId(TEST_PARTITION)
-        );
+    private static void fillMvPartitionStorage(
+            MvPartitionStorage storage,
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            RaftGroupConfiguration raftGroupConfig,
+            List<RowId> rowIds
+    ) {
+        assertEquals(0, rowIds.size() % 2, "size=" + rowIds.size());
 
         storage.runConsistently(() -> {
-            // Writes committed version.
-            storage.addWriteCommitted(rowIds.get(0), createRow("k0", "v0"), 
HYBRID_CLOCK.now());
-            storage.addWriteCommitted(rowIds.get(1), createRow("k1", "v1"), 
HYBRID_CLOCK.now());
-
-            storage.addWriteCommitted(rowIds.get(2), createRow("k20", "v20"), 
HYBRID_CLOCK.now());
-            storage.addWriteCommitted(rowIds.get(2), createRow("k21", "v21"), 
HYBRID_CLOCK.now());
-
-            // Writes an intent to write (uncommitted version).
-            storage.addWrite(rowIds.get(2), createRow("k22", "v22"), 
UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
-
-            storage.addWrite(
-                    rowIds.get(3),
-                    createRow("k3", "v3"),
-                    UUID.randomUUID(),
-                    UUID.randomUUID(),
-                    TEST_PARTITION
-            );
+            for (int i = 0; i < rowIds.size(); i++) {
+                if (i % 2 == 0) {
+                    // Writes committed version.
+                    storage.addWriteCommitted(rowIds.get(i), createRow("k" + 
i, "v" + i), HYBRID_CLOCK.now());
+                } else {
+                    // Writes an intent to write (uncommitted version).
+                    storage.addWrite(rowIds.get(i), createRow("k" + i, "v" + 
i), UUID.randomUUID(), UUID.randomUUID(), TEST_PARTITION);
+                }
+            }
+
+            storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
+
+            storage.committedGroupConfiguration(raftGroupConfig);
 
             return null;
         });
-
-        return rowIds;
     }
 
-    private static List<UUID> fillTxStatePartitionStorage(TxStateStorage 
storage) {
-        List<UUID> txIds = List.of(
-                UUID.randomUUID(),
-                UUID.randomUUID(),
-                UUID.randomUUID(),
-                UUID.randomUUID()
-        );
+    private static void fillTxStatePartitionStorage(
+            TxStateStorage storage,
+            long lastAppliedIndex,
+            long lastAppliedTerm,
+            List<UUID> txIds
+    ) {
+        assertEquals(0, txIds.size() % 2, "size=" + txIds.size());
 
         UUID tableId = UUID.randomUUID();
 
-        storage.put(txIds.get(0), new TxMeta(COMMITED, List.of(new 
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
-        storage.put(txIds.get(1), new TxMeta(COMMITED, List.of(new 
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
-        storage.put(txIds.get(2), new TxMeta(ABORTED, List.of(new 
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
-        storage.put(txIds.get(3), new TxMeta(ABORTED, List.of(new 
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+        for (int i = 0; i < txIds.size(); i++) {
+            TxState txState = i % 2 == 0 ? COMMITED : ABORTED;
 
-        return txIds;
+            storage.put(txIds.get(i), new TxMeta(txState, List.of(new 
TablePartitionId(tableId, TEST_PARTITION)), HYBRID_CLOCK.now()));
+        }
+
+        storage.lastApplied(lastAppliedIndex, lastAppliedTerm);
     }
 
     private static List<ResponseEntry> 
createSnapshotMvDataEntries(MvPartitionStorage storage, List<RowId> rowIds) {
@@ -446,5 +444,153 @@ public class IncomingSnapshotCopierTest {
         });
 
         assertThat(cancelAndJoinFuture, willSucceedIn(1, TimeUnit.SECONDS));
+
+        verify(partitionSnapshotStorage.partition()).abortRebalance();
+    }
+
+    @Test
+    void testCancelOnMiddleRebalance() {
+        MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
+        TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
+
+        long expLastAppliedIndex = 100500L;
+        long expLastAppliedTerm = 100L;
+        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+
+        List<RowId> rowIds = generateRowIds();
+        List<UUID> txIds = generateTxIds();
+
+        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
+
+        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(getTableConfig(), tablesConfig));
+        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
+
+        incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+        MessagingService messagingService = 
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
+                outgoingTxStatePartitionStorage, expLastAppliedIndex, 
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+
+        PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
+                snapshotId,
+                incomingMvTableStorage,
+                incomingTxStateTableStorage,
+                messagingService
+        );
+
+        // Let's add a rebalance interruption in the middle.
+        CompletableFuture<Void> startAddWriteFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> finishAddWriteFuture = new 
CompletableFuture<>();
+
+        doAnswer(answer -> {
+            startAddWriteFuture.complete(null);
+
+            assertThat(finishAddWriteFuture, willCompleteSuccessfully());
+
+            return null;
+        }).when(partitionSnapshotStorage.partition())
+                .addWrite(any(RowId.class), any(TableRow.class), 
any(UUID.class), any(UUID.class), anyInt());
+
+        // Let's start rebalancing.
+        SnapshotCopier snapshotCopier = 
partitionSnapshotStorage.startToCopyFrom(
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(SnapshotCopierOptions.class)
+        );
+
+        // Let's try to cancel it in the middle of the rebalance.
+        CompletableFuture<?> cancelRebalanceFuture = runAsync(() -> {
+            assertThat(startAddWriteFuture, willCompleteSuccessfully());
+
+            CompletableFuture<?> cancelCopierFuture = runAsync(() -> 
finishAddWriteFuture.complete(null));
+
+            snapshotCopier.cancel();
+
+            snapshotCopier.join();
+
+            assertThat(cancelCopierFuture, willCompleteSuccessfully());
+        });
+
+        assertThat(cancelRebalanceFuture, willCompleteSuccessfully());
+
+        verify(partitionSnapshotStorage.partition()).abortRebalance();
+    }
+
+    @Test
+    void testErrorInProcessOfRebalance() {
+        MvPartitionStorage outgoingMvPartitionStorage = new 
TestMvPartitionStorage(TEST_PARTITION);
+        TxStateStorage outgoingTxStatePartitionStorage = new 
TestTxStateStorage();
+
+        long expLastAppliedIndex = 100500L;
+        long expLastAppliedTerm = 100L;
+        RaftGroupConfiguration expLastGroupConfig = generateRaftGroupConfig();
+
+        List<RowId> rowIds = generateRowIds();
+        List<UUID> txIds = generateTxIds();
+
+        fillMvPartitionStorage(outgoingMvPartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, expLastGroupConfig, rowIds);
+        fillTxStatePartitionStorage(outgoingTxStatePartitionStorage, 
expLastAppliedIndex, expLastAppliedTerm, txIds);
+
+        MvTableStorage incomingMvTableStorage = spy(new 
TestMvTableStorage(getTableConfig(), tablesConfig));
+        TxStateTableStorage incomingTxStateTableStorage = spy(new 
TestTxStateTableStorage());
+
+        incomingMvTableStorage.getOrCreateMvPartition(TEST_PARTITION);
+        incomingTxStateTableStorage.getOrCreateTxStateStorage(TEST_PARTITION);
+
+        MessagingService messagingService = 
messagingServiceForSuccessScenario(outgoingMvPartitionStorage,
+                outgoingTxStatePartitionStorage, expLastAppliedIndex, 
expLastAppliedTerm, expLastGroupConfig, rowIds, txIds, snapshotId);
+
+        PartitionSnapshotStorage partitionSnapshotStorage = 
createPartitionSnapshotStorage(
+                snapshotId,
+                incomingMvTableStorage,
+                incomingTxStateTableStorage,
+                messagingService
+        );
+
+        // Let's add an error on the rebalance.
+        
doThrow(StorageException.class).when(partitionSnapshotStorage.partition())
+                .addWrite(any(RowId.class), any(TableRow.class), 
any(UUID.class), any(UUID.class), anyInt());
+
+        // Let's start rebalancing.
+        SnapshotCopier snapshotCopier = 
partitionSnapshotStorage.startToCopyFrom(
+                SnapshotUri.toStringUri(snapshotId, NODE_NAME),
+                mock(SnapshotCopierOptions.class)
+        );
+
+        // Let's wait for an error on rebalancing.
+        assertThat(runAsync(snapshotCopier::join), 
willFailFast(IllegalStateException.class));
+
+        verify(partitionSnapshotStorage.partition()).abortRebalance();
+    }
+
+    private TableConfiguration getTableConfig() {
+        return tablesConfig.tables().get("foo");
+    }
+
+    private static List<RowId> generateRowIds() {
+        return List.of(
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION),
+                new RowId(TEST_PARTITION)
+        );
+    }
+
+    private static List<UUID> generateTxIds() {
+        return List.of(
+                UUID.randomUUID(),
+                UUID.randomUUID(),
+                UUID.randomUUID(),
+                UUID.randomUUID()
+        );
+    }
+
+    private static RaftGroupConfiguration generateRaftGroupConfig() {
+        return new RaftGroupConfiguration(
+                List.of("peer"),
+                List.of("learner"),
+                List.of("old-peer"),
+                List.of("old-learner")
+        );
     }
 }

Reply via email to