This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-27104
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 5c8ba4ab1d8244b415fb36bd3d4b0f543bd7e46f
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Nov 19 17:48:56 2025 +0300

    IGNITE-27104 wip
---
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     | 103 ++++++++++++++++++++-
 modules/runner/build.gradle                        |   1 +
 .../java/org/apache/ignite/internal/Cluster.java   |  44 ++++++---
 3 files changed, 132 insertions(+), 16 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index d1e7042e9de..4484658fef6 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -68,6 +68,7 @@ import 
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.network.NetworkMessage;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMvDataResponse;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.incoming.IncomingSnapshotCopier;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.server.RaftServer;
@@ -107,6 +108,7 @@ import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -379,19 +381,39 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex, true);
     }
 
+    /**
+     * Causes log truncation on the RAFT group (does not guarantee that it 
will be a leader or learner) of the sole table partition that
+     * exists in the cluster. After such truncation, when a knocked-out 
follower gets reanimated, the leader will not be able to feed it
+     * with AppendEntries (because the leader does not already have the index 
that is required to send AppendEntries
+     * to the lagging follower), so the leader will have to send 
InstallSnapshot instead.
+     */
+    private void causeLogTruncationOnSolePartition(int nodeIndex) {
+        doSnapshotOnSolePartition(nodeIndex, true);
+    }
+
     /**
      * Causes a RAFT snapshot to be taken on the RAFT leader of the sole table 
partition that exists in the cluster.
      */
     private void doSnapshotOnSolePartitionLeader(int expectedLeaderNodeIndex, 
boolean forced) throws Exception {
         ReplicationGroupId replicationGroupId = 
cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME);
 
-        doSnapshotOn(replicationGroupId, expectedLeaderNodeIndex, forced);
+        doSnapshotOnLeader(replicationGroupId, expectedLeaderNodeIndex, 
forced);
+    }
+
+    /**
+     * Causes a RAFT snapshot to be taken on the RAFT group (does not 
guarantee that it will be a leader or learner) of the sole table
+     * partition that exists in the cluster.
+     */
+    private void doSnapshotOnSolePartition(int nodeIndex, boolean forced) {
+        ReplicationGroupId replicationGroupId = 
cluster.solePartitionId(TEST_ZONE_NAME, TEST_TABLE_NAME);
+
+        doSnapshotOn(replicationGroupId, nodeIndex, forced);
     }
 
     /**
      * Takes a RAFT snapshot on the leader of the RAFT group corresponding to 
the given table partition.
      */
-    private void doSnapshotOn(ReplicationGroupId replicationGroupId, int 
expectedLeaderNodeIndex, boolean forced) throws Exception {
+    private void doSnapshotOnLeader(ReplicationGroupId replicationGroupId, int 
expectedLeaderNodeIndex, boolean forced) throws Exception {
         RaftGroupService raftGroupService = 
cluster.leaderServiceFor(replicationGroupId);
 
         assertThat(
@@ -399,11 +421,22 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
                 raftGroupService.getServerId().getConsistentId(), 
is(cluster.node(expectedLeaderNodeIndex).name())
         );
 
-        CompletableFuture<Status> fut = new CompletableFuture<>();
+        doSnapshotOn(raftGroupService, forced);
+    }
+
+    /** Takes a RAFT snapshot for the corresponding RAFT group (does not 
guarantee that it will be a leader or learner) and node. */
+    private void doSnapshotOn(ReplicationGroupId replicationGroupId, int 
nodeIndex, boolean forced) {
+        RaftGroupService raftGroupService = 
cluster.raftGroupServiceFor(nodeIndex, replicationGroupId);
+
+        doSnapshotOn(raftGroupService, forced);
+    }
+
+    private static void doSnapshotOn(RaftGroupService raftGroupService, 
boolean forced) {
+        var fut = new CompletableFuture<Status>();
         raftGroupService.getRaftNode().snapshot(fut::complete, forced);
 
         assertThat(fut, willCompleteSuccessfully());
-        assertEquals(RaftError.SUCCESS, fut.get().getRaftError());
+        assertEquals(RaftError.SUCCESS, fut.join().getRaftError());
     }
 
     /**
@@ -613,6 +646,24 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         };
     }
 
+    private BiPredicate<String, NetworkMessage> dropSnapshotMvDataResponse(
+            int targetNodeIndex,
+            CompletableFuture<Void> sentFirstSnapshotMvDataResponse
+    ) {
+        String node2Name = cluster.node(targetNodeIndex).name();
+
+        return (targetConsistentId, message) -> {
+            if (Objects.equals(targetConsistentId, node2Name) && message 
instanceof SnapshotMvDataResponse) {
+                sentFirstSnapshotMvDataResponse.complete(null);
+
+                // Always drop.
+                return true;
+            } else {
+                return false;
+            }
+        };
+    }
+
     /**
      * This is a test for a tricky scenario:
      *
@@ -712,6 +763,50 @@ class ItTableRaftSnapshotsTest extends 
ClusterPerTestIntegrationTest {
         assertThat(getFromNode(2, 1), is("one"));
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26849";)
+    @Test
+    void testRestartNodeAfterTruncateRaftLogPrefixAndAbortRebalance() throws 
Exception {
+        createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
+
+        transferLeadershipOnSolePartitionTo(0);
+
+        putToNode(0, 1, "one");
+        putToNode(0, 2, "two");
+
+        // Let's take RAFT snapshots and truncate RAFT log prefix on all nodes.
+        causeLogTruncationOnSolePartition(0);
+        causeLogTruncationOnSolePartition(1);
+        causeLogTruncationOnSolePartition(2);
+
+        // We will cancel (stopping node) the rebalance immediately after it 
starts for storages.
+        var sentFirstSnapshotMvDataResponseFormNode0Future = new 
CompletableFuture<Void>();
+        
unwrapIgniteImpl(cluster.node(0)).dropMessages(dropSnapshotMvDataResponse(2, 
sentFirstSnapshotMvDataResponseFormNode0Future));
+
+        knockoutNode(2);
+
+        // Let's add more inserts and truncate the RAFT log to initiate a 
rebalance on the returning node.
+        putToNode(0, 3, "three");
+        putToNode(0, 4, "four");
+
+        causeLogTruncationOnSolePartition(0);
+
+        reanimateNode(2);
+
+        assertThat(sentFirstSnapshotMvDataResponseFormNode0Future, 
willSucceedIn(1, TimeUnit.MINUTES));
+
+        knockoutNode(2);
+
+        // Let's try to return the node without stopping the rebalancing.
+        unwrapIgniteImpl(cluster.node(0)).stopDroppingMessages();
+
+        reanimateNode(2);
+
+        assertThat(getFromNode(2, 1), is("one"));
+        assertThat(getFromNode(2, 2), is("two"));
+        assertThat(getFromNode(2, 3), is("three"));
+        assertThat(getFromNode(2, 4), is("four"));
+    }
+
     /**
      * Adds a listener for the {@link #replicatorLogInspector} to hear the 
success of the snapshot installation.
      */
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 04837e3d48a..558b3b6400b 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -233,6 +233,7 @@ dependencies {
     testFixturesImplementation testFixtures(project(':ignite-table'))
     testFixturesImplementation testFixtures(project(':ignite-storage-api'))
     testFixturesImplementation libs.typesafe.config
+    testFixturesImplementation libs.awaitility
 }
 
 tasks.register('postRelease', JavaExec) {
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index ab33a81f6b4..8225608e18b 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -32,6 +32,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.CollectionUtils.setListAtIndex;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,7 +48,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -73,12 +73,12 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.table.NodeUtils;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
@@ -592,20 +592,40 @@ public class Cluster {
         return result;
     }
 
-    @Nullable
-    private RaftGroupService currentLeaderServiceFor(ReplicationGroupId 
groupId) {
+    /**
+     * Returns {@link RaftGroupService} does not guarantee that it will be a 
{@link Node#isLeader() leader} or
+     * {@link Node#isLeader() learner}.
+     */
+    public RaftGroupService raftGroupServiceFor(int nodeIndex, 
ReplicationGroupId groupId) {
+        var res = new AtomicReference<RaftGroupService>();
+
+        await().atMost(10, SECONDS).until(() -> {
+            RaftGroupService service = 
raftGroupService(unwrapIgniteImpl(node(nodeIndex)), groupId);
+
+            res.set(service);
+
+            return service != null;
+        });
+
+        return res.get();
+    }
+
+    private @Nullable RaftGroupService 
currentLeaderServiceFor(ReplicationGroupId groupId) {
         return runningNodes()
                 .map(TestWrappers::unwrapIgniteImpl)
-                .flatMap(ignite -> {
-                    JraftServerImpl server = (JraftServerImpl) 
ignite.raftManager().server();
+                .map(igniteImpl -> raftGroupService(igniteImpl, groupId))
+                .filter(Objects::nonNull)
+                .filter(service -> service.getRaftNode().isLeader())
+                .findAny()
+                .orElse(null);
+    }
 
-                    Optional<RaftNodeId> maybeRaftNodeId = 
server.localNodes().stream()
-                            .filter(nodeId -> nodeId.groupId().equals(groupId))
-                            .findAny();
+    private static @Nullable RaftGroupService raftGroupService(IgniteImpl 
igniteImpl, ReplicationGroupId groupId) {
+        JraftServerImpl server = (JraftServerImpl) 
igniteImpl.raftManager().server();
 
-                    return 
maybeRaftNodeId.map(server::raftGroupService).stream();
-                })
-                .filter(service -> service.getRaftNode().isLeader())
+        return server.localNodes().stream()
+                .filter(nodeId -> nodeId.groupId().equals(groupId))
+                .map(server::raftGroupService)
                 .findAny()
                 .orElse(null);
     }

Reply via email to