This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ca32cfad4a IGNITE-20640 Fix raft clients after stable assignments
update (#2698)
ca32cfad4a is described below
commit ca32cfad4ab3f632e9e0576322523470be47d106
Author: Kirill Gusakov <[email protected]>
AuthorDate: Tue Oct 31 16:32:08 2023 +0300
IGNITE-20640 Fix raft clients after stable assignments update (#2698)
---
.../internal/raft/service/RaftGroupService.java | 7 +++
.../ignite/internal/raft/RaftGroupServiceImpl.java | 9 ++++
.../raft/client/TopologyAwareRaftGroupService.java | 5 ++
.../rebalance/ItRebalanceDistributedTest.java | 55 ++++++++++++++++++++++
.../internal/table/distributed/TableManager.java | 13 ++++-
5 files changed, 87 insertions(+), 2 deletions(-)
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index a0fa7b07c4..1a7987d735 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -245,4 +245,11 @@ public interface RaftGroupService {
* @return Cluster service.
*/
ClusterService clusterService();
+
+ /**
+ * Updates peers and learners lists in raft client.
+ *
+ * @param configuration Peers and learners configuration.
+ */
+ void updateConfiguration(PeersAndLearners configuration);
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index f0cda3d837..5b3eaaab35 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -84,6 +84,8 @@ import org.jetbrains.annotations.Nullable;
/**
* The implementation of {@link RaftGroupService}.
*/
+// TODO: IGNITE-20738 Methods
updateConfiguration/refreshMembers/*Peer/*Learner are not thread-safe
+// and can produce meaningless (peers, learners) pairs as a result.
public class RaftGroupServiceImpl implements RaftGroupService {
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(RaftGroupServiceImpl.class);
@@ -482,6 +484,13 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
return cluster;
}
+ @Override
+ public void updateConfiguration(PeersAndLearners configuration) {
+ peers = List.copyOf(configuration.peers());
+ learners = List.copyOf(configuration.learners());
+ leader = null;
+ }
+
private <R extends NetworkMessage> CompletableFuture<R> sendWithRetry(
Peer peer, Function<Peer, ? extends NetworkMessage> requestFactory
) {
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index 30b4a0c120..da51581fb7 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -495,4 +495,9 @@ public class TopologyAwareRaftGroupService implements
RaftGroupService {
leaderPeer = null;
}
}
+
+ @Override
+ public void updateConfiguration(PeersAndLearners configuration) {
+ this.raftClient.updateConfiguration(configuration);
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a8b1c52b21..46efff26f4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -587,6 +587,61 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
verifyThatRaftNodesAndReplicasWereStartedOnlyOnce();
}
+ @Test
+ void testRaftClientsUpdatesAfterRebalance() throws Exception {
+ Node node = getNode(0);
+
+ createZone(node, ZONE_NAME, 1, 1);
+
+ createTable(node, ZONE_NAME, TABLE_NAME);
+
+ assertTrue(waitForCondition(() -> getPartitionClusterNodes(node,
0).size() == 1, AWAIT_TIMEOUT_MILLIS));
+
+ Set<Assignment> assignmentsBeforeRebalance =
getPartitionClusterNodes(node, 0);
+
+ String newNodeNameForAssignment = nodes.stream()
+ .map(n -> Assignment.forPeer(n.clusterService.nodeName()))
+ .filter(assignment ->
!assignmentsBeforeRebalance.contains(assignment))
+ .findFirst()
+ .orElseThrow()
+ .consistentId();
+
+ Set<Assignment> newAssignment =
Set.of(Assignment.forPeer(newNodeNameForAssignment));
+
+ // Write the new assignments to metastore as a pending assignments.
+ {
+ TablePartitionId partId = new TablePartitionId(getTableId(node,
TABLE_NAME), 0);
+
+ ByteArray partAssignmentsPendingKey =
pendingPartAssignmentsKey(partId);
+
+ byte[] bytesPendingAssignments = ByteUtils.toBytes(newAssignment);
+
+ node.metaStorageManager
+ .put(partAssignmentsPendingKey, bytesPendingAssignments)
+ .get(AWAIT_TIMEOUT_MILLIS, MILLISECONDS);
+ }
+
+ // Wait for rebalance to complete.
+ assertTrue(waitForCondition(
+ () -> nodes.stream().allMatch(n -> getPartitionClusterNodes(n,
0).equals(newAssignment)),
+ (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+ ));
+
+ // Check that raft clients on all nodes were updated with the new list
of peers.
+ assertTrue(waitForCondition(
+ () -> nodes.stream().allMatch(n ->
+ n.tableManager
+ .latestTables()
+ .get(getTableId(node, TABLE_NAME))
+ .internalTable()
+ .partitionRaftGroupService(0)
+ .peers()
+ .equals(List.of(new
Peer(newNodeNameForAssignment)))),
+ (long) AWAIT_TIMEOUT_MILLIS * nodes.size()
+ ));
+
+ }
+
private void clearSpyInvocations() {
for (int i = 0; i < NODE_COUNT; i++) {
clearInvocations(getNode(i).raftManager);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 82aa20c5ba..3bfceb5ab4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2157,6 +2157,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return metaStorageMgr.get(pendingPartAssignmentsKey(tablePartitionId),
stableAssignmentsWatchEvent.revision())
.thenComposeAsync(pendingAssignmentsEntry -> {
+ // Update raft client peers and learners according to the
actual assignments.
+ CompletableFuture<Void> raftClientUpdateFuture =
tablesById(evt.revision()).thenAccept(t -> {
+ t.get(tableId).internalTable()
+
.partitionRaftGroupService(tablePartitionId.partitionId())
+
.updateConfiguration(configurationFromAssignments(stableAssignments));
+ });
+
byte[] pendingAssignmentsFromMetaStorage =
pendingAssignmentsEntry.value();
Set<Assignment> pendingAssignments =
pendingAssignmentsFromMetaStorage == null
@@ -2169,9 +2176,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.noneMatch(assignment ->
assignment.consistentId().equals(localMemberName));
if (shouldStopLocalServices) {
- return stopAndDestroyPartition(tablePartitionId,
evt.revision());
+ return allOf(
+ raftClientUpdateFuture,
+ stopAndDestroyPartition(tablePartitionId,
evt.revision()));
} else {
- return completedFuture(null);
+ return raftClientUpdateFuture;
}
}, ioExecutor);
}