This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 525c3ee2a4 IGNITE-22554 Rename nonStableNodeAssignments and simplify 
code (#3969)
525c3ee2a4 is described below

commit 525c3ee2a42816e81d74c781dadea52f4d6e23a0
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Jun 26 11:14:40 2024 +0300

    IGNITE-22554 Rename nonStableNodeAssignments and simplify code (#3969)
---
 .../internal/table/distributed/TableManager.java   | 245 ++++++++++-----------
 1 file changed, 113 insertions(+), 132 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f8c98a6d2a..240e53810e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -843,107 +843,59 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         int tableId = table.tableId();
 
         // Create new raft nodes according to new assignments.
-        return assignmentsFuture.thenCompose(assignments -> {
+        return assignmentsFuture.thenCompose(tableAssignments -> {
             // Empty assignments might be a valid case if tables are created 
from within cluster init HOCON
             // configuration, which is not supported now.
-            assert assignments != null : IgniteStringFormatter.format("Table 
[id={}] has empty assignments.", tableId);
+            assert tableAssignments != null : 
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
 
-            int partitions = assignments.size();
+            int partitions = tableAssignments.size();
 
-            List<CompletableFuture<?>> futures = new ArrayList<>();
+            var futures = new CompletableFuture<?>[partitions];
 
             for (int i = 0; i < partitions; i++) {
                 int partId = i;
 
+                Assignments assignments = tableAssignments.get(i);
+
                 CompletableFuture<?> future = startPartitionAndStartClient(
-                                table,
-                                partId,
-                                assignments.get(partId),
-                                null,
-                                zoneId,
-                                isRecovery
-                        )
+                        table,
+                        partId,
+                        localMemberAssignment(assignments),
+                        assignments,
+                        zoneId,
+                        isRecovery
+                )
                         .whenComplete((res, ex) -> {
                             if (ex != null) {
                                 LOG.warn("Unable to update raft groups on the 
node [tableId={}, partitionId={}]", ex, tableId, partId);
                             }
                         });
 
-                futures.add(future);
+                futures[i] = future;
             }
 
-            return allOf(futures.toArray(new CompletableFuture<?>[0]));
+            return allOf(futures);
         });
     }
 
     private CompletableFuture<Void> startPartitionAndStartClient(
             TableImpl table,
             int partId,
-            Assignments assignments,
-            @Nullable Assignments nonStableNodeAssignments,
+            @Nullable Assignment localMemberAssignment,
+            Assignments stableAssignments,
             int zoneId,
             boolean isRecovery
     ) {
-        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-
         int tableId = table.tableId();
 
-        InternalTable internalTbl = table.internalTable();
-
-        Assignment localMemberAssignment = assignments.nodes().stream()
-                .filter(a -> a.consistentId().equals(localNode().name()))
-                .findAny()
-                .orElse(null);
+        var internalTbl = (InternalTableImpl) table.internalTable();
 
-        PeersAndLearners realConfiguration = 
fromAssignments(assignments.nodes());
-        PeersAndLearners newConfiguration = nonStableNodeAssignments == null
-                ? realConfiguration : 
fromAssignments(nonStableNodeAssignments.nodes());
+        PeersAndLearners stablePeersAndLearners = 
fromAssignments(stableAssignments.nodes());
 
         TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
 
-        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, Void>(
-                new HybridTimestamp(1, 0)
-        );
-        var storageIndexTracker = new PendingComparableValuesTracker<Long, 
Void>(0L);
-
-        PartitionStorages partitionStorages = getPartitionStorages(table, 
partId);
-
-        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
-                internalTbl, partId);
-
-        storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), 
null);
-
-        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
-                partId,
-                partitionDataStorage,
-                table,
-                safeTimeTracker,
-                storageUpdateConfig
-        );
-
-        boolean shouldStartRaftListeners = 
shouldStartRaftListeners(assignments, nonStableNodeAssignments);
-
-        if (shouldStartRaftListeners) {
-            ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
-
-            mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
-        }
-
-        // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22315
-        Supplier<RaftGroupService> getCachedRaftClient = () -> {
-            try {
-                // Return existing service if it's already started.
-                return internalTbl
-                        .tableRaftService()
-                        .partitionRaftGroupService(replicaGrpId.partitionId());
-            } catch (IgniteInternalException e) {
-                // We use "IgniteInternalException" in accordance with the 
javadoc of "partitionRaftGroupService" method.
-                return null;
-            }
-        };
-
         // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22218
-        Consumer<RaftGroupService> updateTableRaftService = (raftClient) -> 
((InternalTableImpl) internalTbl)
+        Consumer<RaftGroupService> updateTableRaftService = raftClient -> 
internalTbl
                 .tableRaftService()
                 .updateInternalTableRaftGroupService(partId, raftClient);
 
@@ -954,14 +906,12 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     ? 
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
                             replicaGrpId,
                             internalTbl,
-                            newConfiguration,
+                            stablePeersAndLearners,
                             localMemberAssignment
                     )
                     : trueCompletedFuture();
 
-            Assignments forcedAssignments = nonStableNodeAssignments != null 
&& nonStableNodeAssignments.force()
-                    ? nonStableNodeAssignments
-                    : null;
+            Assignments forcedAssignments = stableAssignments.force() ? 
stableAssignments : null;
 
             startGroupFut = replicaMgr.weakStartReplica(
                     replicaGrpId,
@@ -972,7 +922,28 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         }
 
                         // (2) Otherwise let's start replica manually
-                        InternalTable internalTable = table.internalTable();
+                        var safeTimeTracker = new 
PendingComparableValuesTracker<HybridTimestamp, 
Void>(HybridTimestamp.MIN_VALUE);
+
+                        var storageIndexTracker = new 
PendingComparableValuesTracker<Long, Void>(0L);
+
+                        PartitionStorages partitionStorages = 
getPartitionStorages(table, partId);
+
+                        PartitionDataStorage partitionDataStorage = 
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+                                internalTbl, partId);
+
+                        
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+
+                        PartitionUpdateHandlers partitionUpdateHandlers = 
createPartitionUpdateHandlers(
+                                partId,
+                                partitionDataStorage,
+                                table,
+                                safeTimeTracker,
+                                storageUpdateConfig
+                        );
+
+                        internalTbl.updatePartitionTrackers(partId, 
safeTimeTracker, storageIndexTracker);
+
+                        mvGc.addStorage(replicaGrpId, 
partitionUpdateHandlers.gcUpdateHandler);
 
                         RaftGroupListener raftGroupListener = new 
PartitionListener(
                                 txManager,
@@ -988,7 +959,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                         );
 
                         SnapshotStorageFactory snapshotStorageFactory = 
createSnapshotStorageFactory(replicaGrpId,
-                                partitionUpdateHandlers, internalTable);
+                                partitionUpdateHandlers, internalTbl);
 
                         Function<RaftGroupService, ReplicaListener> 
createListener = (raftClient) -> createReplicaListener(
                                 replicaGrpId,
@@ -1001,10 +972,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                         RaftGroupEventsListener raftGroupEventsListener = 
createRaftGroupEventsListener(zoneId, replicaGrpId);
 
-                        MvTableStorage mvTableStorage = 
internalTable.storage();
+                        MvTableStorage mvTableStorage = internalTbl.storage();
 
                         try {
-                            var ret = replicaMgr.startReplica(
+                            return replicaMgr.startReplica(
                                     raftGroupEventsListener,
                                     raftGroupListener,
                                     mvTableStorage.isVolatile(),
@@ -1013,8 +984,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                                     createListener,
                                     storageIndexTracker,
                                     replicaGrpId,
-                                    newConfiguration);
-                            return ret;
+                                    stablePeersAndLearners);
                         } catch (NodeStoppingException e) {
                             throw new AssertionError("Loza was stopped before 
Table manager", e);
                         }
@@ -1027,17 +997,29 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             startGroupFut = falseCompletedFuture();
         }
 
-        startGroupFut
+        return startGroupFut
                 // TODO: the stage will be removed after 
https://issues.apache.org/jira/browse/IGNITE-22315
                 .thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, () 
-> {
                     if (isReplicaStarted) {
                         return nullCompletedFuture();
                     }
 
+                    // TODO: will be removed in 
https://issues.apache.org/jira/browse/IGNITE-22315
+                    Supplier<RaftGroupService> getCachedRaftClient = () -> {
+                        try {
+                            // Return existing service if it's already started.
+                            return internalTbl
+                                    .tableRaftService()
+                                    
.partitionRaftGroupService(replicaGrpId.partitionId());
+                        } catch (IgniteInternalException e) {
+                            // We use "IgniteInternalException" in accordance 
with the javadoc of "partitionRaftGroupService" method.
+                            return null;
+                        }
+                    };
+
                     CompletableFuture<TopologyAwareRaftGroupService> 
newRaftClientFut;
                     try {
-                        newRaftClientFut = replicaMgr.startRaftClient(
-                                replicaGrpId, newConfiguration, 
getCachedRaftClient);
+                        newRaftClientFut = 
replicaMgr.startRaftClient(replicaGrpId, stablePeersAndLearners, 
getCachedRaftClient);
                     } catch (NodeStoppingException e) {
                         throw new CompletionException(e);
                     }
@@ -1046,23 +1028,15 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 .whenComplete((res, ex) -> {
                     if (ex != null) {
                         LOG.warn("Unable to update raft groups on the node 
[tableId={}, partitionId={}]", ex, tableId, partId);
-
-                        resultFuture.completeExceptionally(ex);
-                    } else {
-                        resultFuture.complete(null);
                     }
                 });
-
-        return resultFuture;
     }
 
-    private boolean shouldStartRaftListeners(Assignments assignments, 
@Nullable Assignments nonStableNodeAssignments) {
-        Set<Assignment> nodesForStarting = nonStableNodeAssignments == null
-                ? assignments.nodes()
-                : RebalanceUtil.subtract(nonStableNodeAssignments.nodes(), 
assignments.nodes());
-        return nodesForStarting
-                .stream()
-                .anyMatch(assignment -> 
assignment.consistentId().equals(localNode().name()));
+    @Nullable
+    private Assignment localMemberAssignment(Assignments assignments) {
+        Assignment localMemberAssignment = 
Assignment.forPeer(localNode().name());
+
+        return assignments.nodes().contains(localMemberAssignment) ? 
localMemberAssignment : null;
     }
 
     private PartitionMover createPartitionMover(TablePartitionId replicaGrpId) 
{
@@ -1813,16 +1787,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                         }
 
                         return 
setTablesPartitionCountersForRebalance(replicaGrpId, revision, 
pendingAssignments.force())
-                                .thenCompose(r ->
-                                        handleChangePendingAssignmentEvent(
-                                                replicaGrpId,
-                                                table,
-                                                pendingAssignments,
-                                                stableAssignments == null ? 
emptySet() : stableAssignments.nodes(),
-                                                revision,
-                                                isRecovery
-                                        )
-                                )
+                                .thenCompose(r -> 
handleChangePendingAssignmentEvent(
+                                        replicaGrpId,
+                                        table,
+                                        stableAssignments,
+                                        pendingAssignments,
+                                        revision,
+                                        isRecovery
+                                ))
                                 .thenCompose(v -> 
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(), 
revision));
                     } finally {
                         busyLock.leaveBusy();
@@ -1834,45 +1806,45 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     private CompletableFuture<Void> handleChangePendingAssignmentEvent(
             TablePartitionId replicaGrpId,
             TableImpl tbl,
+            @Nullable Assignments stableAssignments,
             Assignments pendingAssignments,
-            Set<Assignment> stableAssignments,
             long revision,
             boolean isRecovery
     ) {
-        ClusterNode localMember = localNode();
-
         boolean pendingAssignmentsAreForced = pendingAssignments.force();
         Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
 
         // Start a new Raft node and Replica if this node has appeared in the 
new assignments.
-        boolean shouldStartLocalGroupNode = pendingAssignmentsNodes.stream()
-                .filter(assignment -> 
localMember.name().equals(assignment.consistentId()))
-                .anyMatch(assignment -> 
!stableAssignments.contains(assignment));
-
-        CompletableFuture<Void> localServicesStartFuture;
-
-        int tableId = tbl.tableId();
+        Assignment localMemberAssignment = 
localMemberAssignment(pendingAssignments);
 
-        int zoneId = getTableDescriptor(tableId, 
catalogService.latestCatalogVersion()).zoneId();
+        boolean shouldStartLocalGroupNode = localMemberAssignment != null
+                && (stableAssignments == null || 
!stableAssignments.nodes().contains(localMemberAssignment));
 
         // This is a set of assignments for nodes that are not the part of 
stable assignments, i.e. unstable part of the distribution.
         // For regular pending assignments we use (old) stable set, so that 
none of new nodes would be able to propose itself as a leader.
         // For forced assignments, we should do the same thing, but only for 
the subset of stable set that is alive right now. Dead nodes
         // are excluded. It is calculated precisely as an intersection between 
forced assignments and (old) stable assignments.
-        Assignments nonStableNodeAssignments = pendingAssignmentsAreForced
-                ? Assignments.forced(intersect(stableAssignments, 
pendingAssignmentsNodes))
-                : Assignments.of(stableAssignments);
-
-        // This condition can only pass if all stable nodes are dead, and we 
start new raft group from scratch.
-        // In this case new initial configuration must match new forced 
assignments.
-        if (nonStableNodeAssignments.nodes().isEmpty()) {
-            nonStableNodeAssignments = 
Assignments.forced(pendingAssignmentsNodes);
+        Assignments computedStableAssignments;
+
+        if (stableAssignments == null) {
+            // This condition can only pass if all stable nodes are dead, and 
we start new raft group from scratch.
+            // In this case new initial configuration must match new forced 
assignments.
+            computedStableAssignments = 
Assignments.forced(pendingAssignmentsNodes);
+        } else if (pendingAssignmentsAreForced) {
+            // In case of forced assignments we need to remove nodes that are 
present in the stable set but are missing from the
+            // pending set. Such operation removes dead stable nodes from the 
resulting stable set, which guarantees that we will
+            // have a live majority.
+            Set<Assignment> intersection = 
intersect(stableAssignments.nodes(), pendingAssignmentsNodes);
+
+            computedStableAssignments = intersection.isEmpty() ? 
pendingAssignments : Assignments.forced(intersection);
+        } else {
+            computedStableAssignments = stableAssignments;
         }
 
-        Assignments nonStableNodeAssignmentsFinal = nonStableNodeAssignments;
-
         int partitionId = replicaGrpId.partitionId();
 
+        CompletableFuture<Void> localServicesStartFuture;
+
         if (shouldStartLocalGroupNode) {
             PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
 
@@ -1893,11 +1865,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             );
                         }
 
+                        int zoneId = getTableDescriptor(tbl.tableId(), 
catalogService.latestCatalogVersion()).zoneId();
+
                         return startPartitionAndStartClient(
                                 tbl,
                                 replicaGrpId.partitionId(),
-                                pendingAssignments,
-                                nonStableNodeAssignmentsFinal,
+                                localMemberAssignment,
+                                computedStableAssignments,
                                 zoneId,
                                 isRecovery
                         );
@@ -1905,7 +1879,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         } else {
             localServicesStartFuture = runAsync(() -> {
                 if (pendingAssignmentsAreForced && 
replicaMgr.isReplicaStarted(replicaGrpId)) {
-                    replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(nonStableNodeAssignmentsFinal.nodes()));
+                    replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()));
                 }
             }, ioExecutor);
         }
@@ -1913,14 +1887,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return localServicesStartFuture.thenRunAsync(() -> {
             // For forced assignments, we exclude dead stable nodes, and all 
alive stable nodes are already in pending assignments.
             // Union is not required in such a case.
-            Set<Assignment> cfg = pendingAssignmentsAreForced
+            Set<Assignment> newAssignments = pendingAssignmentsAreForced || 
stableAssignments == null
                     ? pendingAssignmentsNodes
-                    : union(pendingAssignmentsNodes, stableAssignments);
+                    : union(pendingAssignmentsNodes, 
stableAssignments.nodes());
 
             tbl.internalTable()
                     .tableRaftService()
                     .partitionRaftGroupService(partitionId)
-                    .updateConfiguration(fromAssignments(cfg));
+                    .updateConfiguration(fromAssignments(newAssignments));
         }, ioExecutor);
     }
 
@@ -2570,7 +2544,14 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
                 int zoneId = getTableDescriptor(tablePartitionId.tableId(), 
catalogService.latestCatalogVersion()).zoneId();
 
-                return startPartitionAndStartClient(table, 
tablePartitionId.partitionId(), stableAssignments, null, zoneId, false);
+                return startPartitionAndStartClient(
+                        table,
+                        tablePartitionId.partitionId(),
+                        localMemberAssignment(stableAssignments),
+                        stableAssignments,
+                        zoneId,
+                        false
+                );
             }, ioExecutor);
         }), ioExecutor));
     }

Reply via email to