This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2a5853bb95 IGNITE-18863 Add a test for a leader change during full
rebalance (#1712)
2a5853bb95 is described below
commit 2a5853bb9590cddbabc01fd5f46cddb252906ce8
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Feb 24 11:07:46 2023 +0300
IGNITE-18863 Add a test for a leader change during full rebalance (#1712)
---
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 91 +++++++++++++++++++++-
1 file changed, 87 insertions(+), 4 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 4de963abce..c6789db457 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -43,6 +43,7 @@ import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.stream.IntStream;
@@ -80,6 +81,7 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -131,14 +133,24 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
private Cluster cluster;
+ private Logger replicatorLogger;
+
+ private @Nullable Handler replicaLoggerHandler;
+
@BeforeEach
void createCluster(TestInfo testInfo) {
cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
+
+ replicatorLogger = Logger.getLogger(Replicator.class.getName());
}
@AfterEach
@Timeout(60)
void shutdownCluster() {
+ if (replicaLoggerHandler != null) {
+ replicatorLogger.removeHandler(replicaLoggerHandler);
+ }
+
cluster.shutdown();
}
@@ -415,8 +427,6 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex,
NodeKnockout knockout) throws InterruptedException {
CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
- Logger replicatorLogger = Logger.getLogger(Replicator.class.getName());
-
var handler = new NoOpHandler() {
@Override
public void publish(LogRecord record) {
@@ -666,8 +676,7 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
private BiPredicate<String, NetworkMessage>
dropFirstSnapshotMetaResponse() {
AtomicBoolean sentSnapshotMetaResponse = new AtomicBoolean(false);
- return dropFirstSnapshotMetaResponse(
- sentSnapshotMetaResponse);
+ return dropFirstSnapshotMetaResponse(sentSnapshotMetaResponse);
}
private BiPredicate<String, NetworkMessage>
dropFirstSnapshotMetaResponse(AtomicBoolean sentSnapshotMetaResponse) {
@@ -680,6 +689,19 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
};
}
+ private BiPredicate<String, NetworkMessage>
dropSnapshotMetaResponse(CompletableFuture<Void> sentFirstSnapshotMetaResponse)
{
+ return (targetConsistentId, message) -> {
+ if (Objects.equals(targetConsistentId, cluster.node(2).name()) &&
message instanceof SnapshotMetaResponse) {
+ sentFirstSnapshotMetaResponse.complete(null);
+
+ // Always drop.
+ return true;
+ } else {
+ return false;
+ }
+ };
+ }
+
/**
* This is a test for a tricky scenario:
*
@@ -732,4 +754,65 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
snapshotExecutorLogger.removeHandler(snapshotInstallFailedDueToIdenticalRetryHandler);
}
}
+
+ @Test
+ void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
+ CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new
CompletableFuture<>();
+
+
prepareClusterForInstallingSnapshotToNode2(NodeKnockout.PARTITION_NETWORK,
DEFAULT_STORAGE_ENGINE, cluster -> {
+ // Let's hang the InstallSnapshot in the "middle" from the leader
with index 1.
+
cluster.node(1).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode1Future));
+ });
+
+ // Change the leader and truncate its log so that InstallSnapshot
occurs instead of AppendEntries.
+ transferLeadershipOnSolePartitionTo(1);
+
+ causeLogTruncationOnSolePartitionLeader();
+
+ CompletableFuture<Void> installSnapshotSuccessfulFuture = new
CompletableFuture<>();
+
+ listenForSnapshotInstalledSuccessFromLogger(0, 2,
installSnapshotSuccessfulFuture);
+
+ // Return node 2.
+ cluster.reanimateNode(2, NodeKnockout.PARTITION_NETWORK);
+
+ // Waiting for the InstallSnapshot from node 2 to hang in the "middle".
+ assertThat(sentSnapshotMetaResponseFormNode1Future, willSucceedIn(1,
TimeUnit.MINUTES));
+
+ // Change the leader to node 0.
+ transferLeadershipOnSolePartitionTo(0);
+
+ // Waiting for the InstallSnapshot successfully from node 0 to node 2.
+ assertThat(installSnapshotSuccessfulFuture, willSucceedIn(1,
TimeUnit.MINUTES));
+
+ // Make sure the rebalancing is complete.
+ List<IgniteBiTuple<Integer, String>> rows = queryWithRetry(2, "select
* from test", ItTableRaftSnapshotsTest::readRows);
+
+ assertThat(rows, is(List.of(new IgniteBiTuple<>(1, "one"))));
+ }
+
+ /**
+ * Adds a listener for the {@link #replicatorLogger} to hear the success
of the snapshot installation.
+ */
+ private void listenForSnapshotInstalledSuccessFromLogger(
+ int nodeIndexFrom,
+ int nodeIndexTo,
+ CompletableFuture<Void> snapshotInstallSuccessfullyFuture
+ ) {
+ String regexp = "Node .+" + nodeIndexFrom + " received
InstallSnapshotResponse from .+_" + nodeIndexTo + " .+ success=true";
+
+ replicaLoggerHandler = new NoOpHandler() {
+ @Override
+ public void publish(LogRecord record) {
+ if (record.getMessage().matches(regexp)) {
+ snapshotInstallSuccessfullyFuture.complete(null);
+
+ replicatorLogger.removeHandler(this);
+ replicaLoggerHandler = null;
+ }
+ }
+ };
+
+ replicatorLogger.addHandler(replicaLoggerHandler);
+ }
}