This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ca32cfad4a IGNITE-20640 Fix raft clients after stable assignments 
update (#2698)
ca32cfad4a is described below

commit ca32cfad4ab3f632e9e0576322523470be47d106
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Oct 31 16:32:08 2023 +0300

    IGNITE-20640 Fix raft clients after stable assignments update (#2698)
---
 .../internal/raft/service/RaftGroupService.java    |  7 +++
 .../ignite/internal/raft/RaftGroupServiceImpl.java |  9 ++++
 .../raft/client/TopologyAwareRaftGroupService.java |  5 ++
 .../rebalance/ItRebalanceDistributedTest.java      | 55 ++++++++++++++++++++++
 .../internal/table/distributed/TableManager.java   | 13 ++++-
 5 files changed, 87 insertions(+), 2 deletions(-)

diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index a0fa7b07c4..1a7987d735 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -245,4 +245,11 @@ public interface RaftGroupService {
      * @return Cluster service.
      */
     ClusterService clusterService();
+
+    /**
+     * Updates peers and learners lists in raft client.
+     *
+     * @param configuration Peers and learners configuration.
+     */
+    void updateConfiguration(PeersAndLearners configuration);
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index f0cda3d837..5b3eaaab35 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -84,6 +84,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * The implementation of {@link RaftGroupService}.
  */
+// TODO: IGNITE-20738 Methods 
updateConfiguration/refreshMembers/*Peer/*Learner are not thread-safe
+// and can produce meaningless (peers, learners) pairs as a result.
 public class RaftGroupServiceImpl implements RaftGroupService {
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(RaftGroupServiceImpl.class);
@@ -482,6 +484,13 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
         return cluster;
     }
 
+    @Override
+    public void updateConfiguration(PeersAndLearners configuration) {
+        peers = List.copyOf(configuration.peers());
+        learners = List.copyOf(configuration.learners());
+        leader = null;
+    }
+
     private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(
             Peer peer, Function<Peer, ? extends NetworkMessage> requestFactory
     ) {
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 30b4a0c120..da51581fb7 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -495,4 +495,9 @@ public class TopologyAwareRaftGroupService implements 
RaftGroupService {
             leaderPeer = null;
         }
     }
+
+    @Override
+    public void updateConfiguration(PeersAndLearners configuration) {
+        this.raftClient.updateConfiguration(configuration);
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a8b1c52b21..46efff26f4 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -587,6 +587,61 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         verifyThatRaftNodesAndReplicasWereStartedOnlyOnce();
     }
 
+    @Test
+    void testRaftClientsUpdatesAfterRebalance() throws Exception {
+        Node node = getNode(0);
+
+        createZone(node, ZONE_NAME, 1, 1);
+
+        createTable(node, ZONE_NAME, TABLE_NAME);
+
+        assertTrue(waitForCondition(() -> getPartitionClusterNodes(node, 
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+
+        Set<Assignment> assignmentsBeforeRebalance = 
getPartitionClusterNodes(node, 0);
+
+        String newNodeNameForAssignment = nodes.stream()
+                .map(n -> Assignment.forPeer(n.clusterService.nodeName()))
+                .filter(assignment -> 
!assignmentsBeforeRebalance.contains(assignment))
+                .findFirst()
+                .orElseThrow()
+                .consistentId();
+
+        Set<Assignment> newAssignment = 
Set.of(Assignment.forPeer(newNodeNameForAssignment));
+
+        // Write the new assignments to metastore as a pending assignments.
+        {
+            TablePartitionId partId = new TablePartitionId(getTableId(node, 
TABLE_NAME), 0);
+
+            ByteArray partAssignmentsPendingKey = 
pendingPartAssignmentsKey(partId);
+
+            byte[] bytesPendingAssignments = ByteUtils.toBytes(newAssignment);
+
+            node.metaStorageManager
+                    .put(partAssignmentsPendingKey, bytesPendingAssignments)
+                    .get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS);
+        }
+
+        // Wait for rebalance to complete.
+        assertTrue(waitForCondition(
+                () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n, 
0).equals(newAssignment)),
+                (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+        ));
+
+        // Check that raft clients on all nodes were updated with the new list 
of peers.
+        assertTrue(waitForCondition(
+                () -> nodes.stream().allMatch(n ->
+                        n.tableManager
+                                .latestTables()
+                                .get(getTableId(node, TABLE_NAME))
+                                .internalTable()
+                                .partitionRaftGroupService(0)
+                                .peers()
+                                .equals(List.of(new 
Peer(newNodeNameForAssignment)))),
+                (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+        ));
+
+    }
+
     private void clearSpyInvocations() {
         for (int i = 0; i < NODE_COUNT; i++) {
             clearInvocations(getNode(i).raftManager);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 82aa20c5ba..3bfceb5ab4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2157,6 +2157,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId), 
stableAssignmentsWatchEvent.revision())
                 .thenComposeAsync(pendingAssignmentsEntry -> {
+                    // Update raft client peers and learners according to the 
actual assignments.
+                    CompletableFuture<Void> raftClientUpdateFuture = 
tablesById(evt.revision()).thenAccept(t -> {
+                        t.get(tableId).internalTable()
+                                
.partitionRaftGroupService(tablePartitionId.partitionId())
+                                
.updateConfiguration(configurationFromAssignments(stableAssignments));
+                    });
+
                     byte[] pendingAssignmentsFromMetaStorage = 
pendingAssignmentsEntry.value();
 
                     Set<Assignment> pendingAssignments = 
pendingAssignmentsFromMetaStorage == null
@@ -2169,9 +2176,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             .noneMatch(assignment -> 
assignment.consistentId().equals(localMemberName));
 
                     if (shouldStopLocalServices) {
-                        return stopAndDestroyPartition(tablePartitionId, 
evt.revision());
+                        return allOf(
+                                raftClientUpdateFuture,
+                                stopAndDestroyPartition(tablePartitionId, 
evt.revision()));
                     } else {
-                        return completedFuture(null);
+                        return raftClientUpdateFuture;
                     }
                 }, ioExecutor);
     }

Reply via email to