This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a5853bb95 IGNITE-18863 Add a test for a leader change during full 
rebalance (#1712)
2a5853bb95 is described below

commit 2a5853bb9590cddbabc01fd5f46cddb252906ce8
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Feb 24 11:07:46 2023 +0300

    IGNITE-18863 Add a test for a leader change during full rebalance (#1712)
---
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 91 +++++++++++++++++++++-
 1 file changed, 87 insertions(+), 4 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 4de963abce..c6789db457 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -43,6 +43,7 @@ import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.logging.Handler;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
 import java.util.stream.IntStream;
@@ -80,6 +81,7 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -131,14 +133,24 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
 
     private Cluster cluster;
 
+    private Logger replicatorLogger;
+
+    private @Nullable Handler replicaLoggerHandler;
+
     @BeforeEach
     void createCluster(TestInfo testInfo) {
         cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+
+        replicatorLogger = Logger.getLogger(Replicator.class.getName());
     }
 
     @AfterEach
     @Timeout(60)
     void shutdownCluster() {
+        if (replicaLoggerHandler != null) {
+            replicatorLogger.removeHandler(replicaLoggerHandler);
+        }
+
         cluster.shutdown();
     }
 
@@ -415,8 +427,6 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
     private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex, 
NodeKnockout knockout) throws InterruptedException {
         CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
 
-        Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
-
         var handler = new NoOpHandler() {
             @Override
             public void publish(LogRecord record) {
@@ -666,8 +676,7 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
     private BiPredicate<String, NetworkMessage> 
dropFirstSnapshotMetaResponse() {
         AtomicBoolean sentSnapshotMetaResponse = new AtomicBoolean(false);
 
-        return dropFirstSnapshotMetaResponse(
-                sentSnapshotMetaResponse);
+        return dropFirstSnapshotMetaResponse(sentSnapshotMetaResponse);
     }
 
     private BiPredicate<String, NetworkMessage> 
dropFirstSnapshotMetaResponse(AtomicBoolean sentSnapshotMetaResponse) {
@@ -680,6 +689,19 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
         };
     }
 
+    private BiPredicate<String, NetworkMessage> 
dropSnapshotMetaResponse(CompletableFuture<Void> sentFirstSnapshotMetaResponse) 
{
+        return (targetConsistentId, message) -> {
+            if (Objects.equals(targetConsistentId, cluster.node(2).name()) && 
message instanceof SnapshotMetaResponse) {
+                sentFirstSnapshotMetaResponse.complete(null);
+
+                // Always drop.
+                return true;
+            } else {
+                return false;
+            }
+        };
+    }
+
     /**
      * This is a test for a tricky scenario:
      *
@@ -732,4 +754,65 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
             
snapshotExecutorLogger.removeHandler(snapshotInstallFailedDueToIdenticalRetryHandler);
         }
     }
+
+    @Test
+    void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
+        CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new 
CompletableFuture<>();
+
+        
prepareClusterForInstallingSnapshotToNode2(NodeKnockout.PARTITION_NETWORK, 
DEFAULT_STORAGE_ENGINE, cluster -> {
+            // Let's hang the InstallSnapshot in the "middle" from the leader 
with index 1.
+            
cluster.node(1).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode1Future));
+        });
+
+        // Change the leader and truncate its log so that InstallSnapshot 
occurs instead of AppendEntries.
+        transferLeadershipOnSolePartitionTo(1);
+
+        causeLogTruncationOnSolePartitionLeader();
+
+        CompletableFuture<Void> installSnapshotSuccessfulFuture = new 
CompletableFuture<>();
+
+        listenForSnapshotInstalledSuccessFromLogger(0, 2, 
installSnapshotSuccessfulFuture);
+
+        // Return node 2.
+        cluster.reanimateNode(2, NodeKnockout.PARTITION_NETWORK);
+
+        // Waiting for the InstallSnapshot from node 2 to hang in the "middle".
+        assertThat(sentSnapshotMetaResponseFormNode1Future, willSucceedIn(1, 
TimeUnit.MINUTES));
+
+        // Change the leader to node 0.
+        transferLeadershipOnSolePartitionTo(0);
+
+        // Waiting for the InstallSnapshot successfully from node 0 to node 2.
+        assertThat(installSnapshotSuccessfulFuture, willSucceedIn(1, 
TimeUnit.MINUTES));
+
+        // Make sure the rebalancing is complete.
+        List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select 
* from test", ItTableRaftSnapshotsTest::readRows);
+
+        assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+    }
+
+    /**
+     * Adds a listener for the {@link #replicatorLogger} to hear the success 
of the snapshot installation.
+     */
+    private void listenForSnapshotInstalledSuccessFromLogger(
+            int nodeIndexFrom,
+            int nodeIndexTo,
+            CompletableFuture<Void> snapshotInstallSuccessfullyFuture
+    ) {
+        String regexp = "Node .+" + nodeIndexFrom + " received 
InstallSnapshotResponse from .+_" + nodeIndexTo + " .+ success=true";
+
+        replicaLoggerHandler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().matches(regexp)) {
+                    snapshotInstallSuccessfullyFuture.complete(null);
+
+                    replicatorLogger.removeHandler(this);
+                    replicaLoggerHandler = null;
+                }
+            }
+        };
+
+        replicatorLogger.addHandler(replicaLoggerHandler);
+    }
 }

Reply via email to