This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 565171b8804 IGNITE-27104 Add a reproducer of inconsistency to the raft
log after its truncating and rebalance abort (#7016)
565171b8804 is described below
commit 565171b8804cd0617ea0a4f1cff1e50103b48a2f
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 19 20:02:55 2025 +0300
IGNITE-27104 Add a reproducer of inconsistency to the raft log after its
truncating and rebalance abort (#7016)
---
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 104 ++++++++++++++++++++-
modules/runner/build.gradle | 1 +
.../java/org/apache/ignite/internal/Cluster.java | 44 ++++++---
3 files changed, 133 insertions(+), 16 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index d1e7042e9de..280c7f736c9 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -68,6 +68,7 @@ import
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
+import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.server.RaftServer;
@@ -107,6 +108,7 @@ import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
@@ -379,19 +381,39 @@ class ItTableRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex, true);
}
+ /**
+ * Causes log truncation on the RAFT group (does not guarantee that it
will be a leader or learner) of the sole table partition that
+ * exists in the cluster. After such truncation, when a knocked-out
follower gets reanimated, the leader will not be able to feed it
+ * with AppendEntries (because the leader does not already have the index
that is required to send AppendEntries
+ * to the lagging follower), so the leader will have to send
InstallSnapshot instead.
+ */
+ private void causeLogTruncationOnSolePartition(int nodeIndex) {
+ doSnapshotOnSolePartition(nodeIndex, true);
+ }
+
/**
* Causes a RAFT snapshot to be taken on the RAFT leader of the sole table
partition that exists in the cluster.
*/
private void doSnapshotOnSolePartitionLeader(int expectedLeaderNodeIndex,
boolean forced) throws Exception {
ReplicationGroupId replicationGroupId =
cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME);
- doSnapshotOn(replicationGroupId, expectedLeaderNodeIndex, forced);
+ doSnapshotOnLeader(replicationGroupId, expectedLeaderNodeIndex,
forced);
+ }
+
+ /**
+ * Causes a RAFT snapshot to be taken on the RAFT group (does not
guarantee that it will be a leader or learner) of the sole table
+ * partition that exists in the cluster.
+ */
+ private void doSnapshotOnSolePartition(int nodeIndex, boolean forced) {
+ ReplicationGroupId replicationGroupId =
cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME);
+
+ doSnapshotOn(replicationGroupId, nodeIndex, forced);
}
/**
* Takes a RAFT snapshot on the leader of the RAFT group corresponding to
the given table partition.
*/
- private void doSnapshotOn(ReplicationGroupId replicationGroupId, int
expectedLeaderNodeIndex, boolean forced) throws Exception {
+ private void doSnapshotOnLeader(ReplicationGroupId replicationGroupId, int
expectedLeaderNodeIndex, boolean forced) throws Exception {
RaftGroupService raftGroupService =
cluster.leaderServiceFor(replicationGroupId);
assertThat(
@@ -399,11 +421,22 @@ class ItTableRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
raftGroupService.getServerId().getConsistentId(),
is(cluster.node(expectedLeaderNodeIndex).name())
);
- CompletableFuture<Status> fut = new CompletableFuture<>();
+ doSnapshotOn(raftGroupService, forced);
+ }
+
+ /** Takes a RAFT snapshot for the corresponding RAFT group (does not
guarantee that it will be a leader or learner) and node. */
+ private void doSnapshotOn(ReplicationGroupId replicationGroupId, int
nodeIndex, boolean forced) {
+ RaftGroupService raftGroupService =
cluster.raftGroupServiceFor(nodeIndex, replicationGroupId);
+
+ doSnapshotOn(raftGroupService, forced);
+ }
+
+ private static void doSnapshotOn(RaftGroupService raftGroupService,
boolean forced) {
+ var fut = new CompletableFuture<Status>();
raftGroupService.getRaftNode().snapshot(fut::complete, forced);
assertThat(fut, willCompleteSuccessfully());
- assertEquals(RaftError.SUCCESS, fut.get().getRaftError());
+ assertEquals(RaftError.SUCCESS, fut.join().getRaftError());
}
/**
@@ -613,6 +646,24 @@ class ItTableRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
};
}
+ private BiPredicate<String, NetworkMessage> dropSnapshotMvDataResponse(
+ int targetNodeIndex,
+ CompletableFuture<Void> sentFirstSnapshotMvDataResponse
+ ) {
+ String targetNodeName = cluster.node(targetNodeIndex).name();
+
+ return (targetConsistentId, message) -> {
+ if (Objects.equals(targetConsistentId, targetNodeName) && message
instanceof SnapshotMvDataResponse) {
+ sentFirstSnapshotMvDataResponse.complete(null);
+
+ // Always drop.
+ return true;
+ } else {
+ return false;
+ }
+ };
+ }
+
/**
* This is a test for a tricky scenario:
*
@@ -712,6 +763,51 @@ class ItTableRaftSnapshotsTest extends
ClusterPerTestIntegrationTest {
assertThat(getFromNode(2, 1), is("one"));
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849")
+ @Test
+ void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws
Exception {
+ createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+ transferLeadershipOnSolePartitionTo(0);
+
+ putToNode(0, 1, "one");
+ putToNode(0, 2, "two");
+
+ // Let's take RAFT snapshots and truncate RAFT log prefix on all nodes.
+ causeLogTruncationOnSolePartition(0);
+ causeLogTruncationOnSolePartition(1);
+ causeLogTruncationOnSolePartition(2);
+
+ // We will cancel (stopping node) the rebalance immediately after it
starts for storages.
+ var sentFirstSnapshotMvDataResponseFormNode0Future = new
CompletableFuture<Void>();
+
unwrapIgniteImpl(cluster.node(0)).dropMessages(dropSnapshotMvDataResponse(2,
sentFirstSnapshotMvDataResponseFormNode0Future));
+
+ knockoutNode(2);
+
+ // Let's add more inserts and truncate the RAFT log to initiate a
rebalance on the returning node.
+ putToNode(0, 3, "three");
+ putToNode(0, 4, "four");
+
+ causeLogTruncationOnSolePartition(0);
+
+ reanimateNode(2);
+
+ // The wait is so long, similar to the neighboring tests.
+ assertThat(sentFirstSnapshotMvDataResponseFormNode0Future,
willSucceedIn(1, TimeUnit.MINUTES));
+
+ knockoutNode(2);
+
+ // Let's try to return the node without stopping the rebalancing.
+ unwrapIgniteImpl(cluster.node(0)).stopDroppingMessages();
+
+ reanimateNode(2);
+
+ assertThat(getFromNode(2, 1), is("one"));
+ assertThat(getFromNode(2, 2), is("two"));
+ assertThat(getFromNode(2, 3), is("three"));
+ assertThat(getFromNode(2, 4), is("four"));
+ }
+
/**
* Adds a listener for the {@link #replicatorLogInspector} to hear the
success of the snapshot installation.
*/
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 04837e3d48a..558b3b6400b 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -233,6 +233,7 @@ dependencies {
testFixturesImplementation testFixtures(project(':ignite-table'))
testFixturesImplementation testFixtures(project(':ignite-storage-api'))
testFixturesImplementation libs.typesafe.config
+ testFixturesImplementation libs.awaitility
}
tasks.register('postRelease', JavaExec) {
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index ab33a81f6b4..8225608e18b 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -32,6 +32,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CollectionUtils.setListAtIndex;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,7 +48,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -73,12 +73,12 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.table.NodeUtils;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -592,20 +592,40 @@ public class Cluster {
return result;
}
- @Nullable
- private RaftGroupService currentLeaderServiceFor(ReplicationGroupId
groupId) {
+ /**
+ * Returns {@link RaftGroupService} does not guarantee that it will be a
{@link Node#isLeader() leader} or
+ * {@link Node#isLeader() learner}.
+ */
+ public RaftGroupService raftGroupServiceFor(int nodeIndex,
ReplicationGroupId groupId) {
+ var res = new AtomicReference<RaftGroupService>();
+
+ await().atMost(10, SECONDS).until(() -> {
+ RaftGroupService service =
raftGroupService(unwrapIgniteImpl(node(nodeIndex)), groupId);
+
+ res.set(service);
+
+ return service != null;
+ });
+
+ return res.get();
+ }
+
+ private @Nullable RaftGroupService
currentLeaderServiceFor(ReplicationGroupId groupId) {
return runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
- .flatMap(ignite -> {
- JraftServerImpl server = (JraftServerImpl)
ignite.raftManager().server();
+ .map(igniteImpl -> raftGroupService(igniteImpl, groupId))
+ .filter(Objects::nonNull)
+ .filter(service -> service.getRaftNode().isLeader())
+ .findAny()
+ .orElse(null);
+ }
- Optional<RaftNodeId> maybeRaftNodeId =
server.localNodes().stream()
- .filter(nodeId -> nodeId.groupId().equals(groupId))
- .findAny();
+ private static @Nullable RaftGroupService raftGroupService(IgniteImpl
igniteImpl, ReplicationGroupId groupId) {
+ JraftServerImpl server = (JraftServerImpl)
igniteImpl.raftManager().server();
- return
maybeRaftNodeId.map(server::raftGroupService).stream();
- })
- .filter(service -> service.getRaftNode().isLeader())
+ return server.localNodes().stream()
+ .filter(nodeId -> nodeId.groupId().equals(groupId))
+ .map(server::raftGroupService)
.findAny()
.orElse(null);
}