This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-27104 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 5c8ba4ab1d8244b415fb36bd3d4b0f543bd7e46f Author: Kirill Tkalenko <[email protected]> AuthorDate: Wed Nov 19 17:48:56 2025 +0300 IGNITE-27104 wip --- .../raftsnapshot/ItTableRaftSnapshotsTest.java | 103 ++++++++++++++++++++- modules/runner/build.gradle | 1 + .../java/org/apache/ignite/internal/Cluster.java | 44 ++++++--- 3 files changed, 132 insertions(+), 16 deletions(-) diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java index d1e7042e9de..4484658fef6 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry; import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse; +import org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse; import org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier; import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.raft.server.RaftServer; @@ -107,6 +108,7 @@ import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -379,19 +381,39 @@ class ItTableRaftSnapshotsTest extends ClusterPerTestIntegrationTest { doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex, true); } + /** + * Causes log truncation on the RAFT group (does not guarantee that it will be a leader or learner) of the sole table partition that + * exists in the cluster. After such truncation, when a knocked-out follower gets reanimated, the leader will not be able to feed it + * with AppendEntries (because the leader does not already have the index that is required to send AppendEntries + * to the lagging follower), so the leader will have to send InstallSnapshot instead. + */ + private void causeLogTruncationOnSolePartition(int nodeIndex) { + doSnapshotOnSolePartition(nodeIndex, true); + } + /** * Causes a RAFT snapshot to be taken on the RAFT leader of the sole table partition that exists in the cluster. */ private void doSnapshotOnSolePartitionLeader(int expectedLeaderNodeIndex, boolean forced) throws Exception { ReplicationGroupId replicationGroupId = cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME); - doSnapshotOn(replicationGroupId, expectedLeaderNodeIndex, forced); + doSnapshotOnLeader(replicationGroupId, expectedLeaderNodeIndex, forced); + } + + /** + * Causes a RAFT snapshot to be taken on the RAFT group (does not guarantee that it will be a leader or learner) of the sole table + * partition that exists in the cluster. + */ + private void doSnapshotOnSolePartition(int nodeIndex, boolean forced) { + ReplicationGroupId replicationGroupId = cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME); + + doSnapshotOn(replicationGroupId, nodeIndex, forced); } /** * Takes a RAFT snapshot on the leader of the RAFT group corresponding to the given table partition. */ - private void doSnapshotOn(ReplicationGroupId replicationGroupId, int expectedLeaderNodeIndex, boolean forced) throws Exception { + private void doSnapshotOnLeader(ReplicationGroupId replicationGroupId, int expectedLeaderNodeIndex, boolean forced) throws Exception { RaftGroupService raftGroupService = cluster.leaderServiceFor(replicationGroupId); assertThat( @@ -399,11 +421,22 @@ class ItTableRaftSnapshotsTest extends ClusterPerTestIntegrationTest { raftGroupService.getServerId().getConsistentId(), is(cluster.node(expectedLeaderNodeIndex).name()) ); - CompletableFuture<Status> fut = new CompletableFuture<>(); + doSnapshotOn(raftGroupService, forced); + } + + /** Takes a RAFT snapshot for the corresponding RAFT group (does not guarantee that it will be a leader or learner) and node. */ + private void doSnapshotOn(ReplicationGroupId replicationGroupId, int nodeIndex, boolean forced) { + RaftGroupService raftGroupService = cluster.raftGroupServiceFor(nodeIndex, replicationGroupId); + + doSnapshotOn(raftGroupService, forced); + } + + private static void doSnapshotOn(RaftGroupService raftGroupService, boolean forced) { + var fut = new CompletableFuture<Status>(); raftGroupService.getRaftNode().snapshot(fut::complete, forced); assertThat(fut, willCompleteSuccessfully()); - assertEquals(RaftError.SUCCESS, fut.get().getRaftError()); + assertEquals(RaftError.SUCCESS, fut.join().getRaftError()); } /** @@ -613,6 +646,24 @@ class ItTableRaftSnapshotsTest extends ClusterPerTestIntegrationTest { }; } + private BiPredicate<String, NetworkMessage> dropSnapshotMvDataResponse( + int targetNodeIndex, + CompletableFuture<Void> sentFirstSnapshotMvDataResponse + ) { + String node2Name = cluster.node(targetNodeIndex).name(); + + return (targetConsistentId, message) -> { + if (Objects.equals(targetConsistentId, node2Name) && message instanceof SnapshotMvDataResponse) { + sentFirstSnapshotMvDataResponse.complete(null); + + // Always drop. + return true; + } else { + return false; + } + }; + } + /** * This is a test for a tricky scenario: * @@ -712,6 +763,50 @@ class ItTableRaftSnapshotsTest extends ClusterPerTestIntegrationTest { assertThat(getFromNode(2, 1), is("one")); } + @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849") + @Test + void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws Exception { + createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE); + + transferLeadershipOnSolePartitionTo(0); + + putToNode(0, 1, "one"); + putToNode(0, 2, "two"); + + // Let's take RAFT snapshots and truncate RAFT log prefix on all nodes. + causeLogTruncationOnSolePartition(0); + causeLogTruncationOnSolePartition(1); + causeLogTruncationOnSolePartition(2); + + // We will cancel (stopping node) the rebalance immediately after it starts for storages. + var sentFirstSnapshotMvDataResponseFormNode0Future = new CompletableFuture<Void>(); + unwrapIgniteImpl(cluster.node(0)).dropMessages(dropSnapshotMvDataResponse(2, sentFirstSnapshotMvDataResponseFormNode0Future)); + + knockoutNode(2); + + // Let's add more inserts and truncate the RAFT log to initiate a rebalance on the returning node. + putToNode(0, 3, "three"); + putToNode(0, 4, "four"); + + causeLogTruncationOnSolePartition(0); + + reanimateNode(2); + + assertThat(sentFirstSnapshotMvDataResponseFormNode0Future, willSucceedIn(1, TimeUnit.MINUTES)); + + knockoutNode(2); + + // Let's try to return the node without stopping the rebalancing. + unwrapIgniteImpl(cluster.node(0)).stopDroppingMessages(); + + reanimateNode(2); + + assertThat(getFromNode(2, 1), is("one")); + assertThat(getFromNode(2, 2), is("two")); + assertThat(getFromNode(2, 3), is("three")); + assertThat(getFromNode(2, 4), is("four")); + } + /** * Adds a listener for the {@link #replicatorLogInspector} to hear the success of the snapshot installation. */ diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle index 04837e3d48a..558b3b6400b 100644 --- a/modules/runner/build.gradle +++ b/modules/runner/build.gradle @@ -233,6 +233,7 @@ dependencies { testFixturesImplementation testFixtures(project(':ignite-table')) testFixturesImplementation testFixtures(project(':ignite-storage-api')) testFixturesImplementation libs.typesafe.config + testFixturesImplementation libs.awaitility } tasks.register('postRelease', JavaExec) { diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java index ab33a81f6b4..8225608e18b 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java @@ -32,6 +32,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.CollectionUtils.setListAtIndex; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -47,7 +48,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -73,12 +73,12 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.placementdriver.ReplicaMeta; -import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.server.impl.JraftServerImpl; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.table.NodeUtils; import org.apache.ignite.internal.testframework.TestIgnitionManager; +import org.apache.ignite.raft.jraft.Node; import org.apache.ignite.raft.jraft.RaftGroupService; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.core.NodeImpl; @@ -592,20 +592,40 @@ public class Cluster { return result; } - @Nullable - private RaftGroupService currentLeaderServiceFor(ReplicationGroupId groupId) { + /** + * Returns {@link RaftGroupService} does not guarantee that it will be a {@link Node#isLeader() leader} or + * {@link Node#isLeader() learner}. + */ + public RaftGroupService raftGroupServiceFor(int nodeIndex, ReplicationGroupId groupId) { + var res = new AtomicReference<RaftGroupService>(); + + await().atMost(10, SECONDS).until(() -> { + RaftGroupService service = raftGroupService(unwrapIgniteImpl(node(nodeIndex)), groupId); + + res.set(service); + + return service != null; + }); + + return res.get(); + } + + private @Nullable RaftGroupService currentLeaderServiceFor(ReplicationGroupId groupId) { return runningNodes() .map(TestWrappers::unwrapIgniteImpl) - .flatMap(ignite -> { - JraftServerImpl server = (JraftServerImpl) ignite.raftManager().server(); + .map(igniteImpl -> raftGroupService(igniteImpl, groupId)) + .filter(Objects::nonNull) + .filter(service -> service.getRaftNode().isLeader()) + .findAny() + .orElse(null); + } - Optional<RaftNodeId> maybeRaftNodeId = server.localNodes().stream() - .filter(nodeId -> nodeId.groupId().equals(groupId)) - .findAny(); + private static @Nullable RaftGroupService raftGroupService(IgniteImpl igniteImpl, ReplicationGroupId groupId) { + JraftServerImpl server = (JraftServerImpl) igniteImpl.raftManager().server(); - return maybeRaftNodeId.map(server::raftGroupService).stream(); - }) - .filter(service -> service.getRaftNode().isLeader()) + return server.localNodes().stream() + .filter(nodeId -> nodeId.groupId().equals(groupId)) + .map(server::raftGroupService) .findAny() .orElse(null); }
