This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 525c3ee2a4 IGNITE-22554 Rename nonStableNodeAssignments and simplify
code (#3969)
525c3ee2a4 is described below
commit 525c3ee2a42816e81d74c781dadea52f4d6e23a0
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Jun 26 11:14:40 2024 +0300
IGNITE-22554 Rename nonStableNodeAssignments and simplify code (#3969)
---
.../internal/table/distributed/TableManager.java | 245 ++++++++++-----------
1 file changed, 113 insertions(+), 132 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f8c98a6d2a..240e53810e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -843,107 +843,59 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int tableId = table.tableId();
// Create new raft nodes according to new assignments.
- return assignmentsFuture.thenCompose(assignments -> {
+ return assignmentsFuture.thenCompose(tableAssignments -> {
// Empty assignments might be a valid case if tables are created
from within cluster init HOCON
// configuration, which is not supported now.
- assert assignments != null : IgniteStringFormatter.format("Table
[id={}] has empty assignments.", tableId);
+ assert tableAssignments != null :
IgniteStringFormatter.format("Table [id={}] has empty assignments.", tableId);
- int partitions = assignments.size();
+ int partitions = tableAssignments.size();
- List<CompletableFuture<?>> futures = new ArrayList<>();
+ var futures = new CompletableFuture<?>[partitions];
for (int i = 0; i < partitions; i++) {
int partId = i;
+ Assignments assignments = tableAssignments.get(i);
+
CompletableFuture<?> future = startPartitionAndStartClient(
- table,
- partId,
- assignments.get(partId),
- null,
- zoneId,
- isRecovery
- )
+ table,
+ partId,
+ localMemberAssignment(assignments),
+ assignments,
+ zoneId,
+ isRecovery
+ )
.whenComplete((res, ex) -> {
if (ex != null) {
LOG.warn("Unable to update raft groups on the
node [tableId={}, partitionId={}]", ex, tableId, partId);
}
});
- futures.add(future);
+ futures[i] = future;
}
- return allOf(futures.toArray(new CompletableFuture<?>[0]));
+ return allOf(futures);
});
}
private CompletableFuture<Void> startPartitionAndStartClient(
TableImpl table,
int partId,
- Assignments assignments,
- @Nullable Assignments nonStableNodeAssignments,
+ @Nullable Assignment localMemberAssignment,
+ Assignments stableAssignments,
int zoneId,
boolean isRecovery
) {
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-
int tableId = table.tableId();
- InternalTable internalTbl = table.internalTable();
-
- Assignment localMemberAssignment = assignments.nodes().stream()
- .filter(a -> a.consistentId().equals(localNode().name()))
- .findAny()
- .orElse(null);
+ var internalTbl = (InternalTableImpl) table.internalTable();
- PeersAndLearners realConfiguration =
fromAssignments(assignments.nodes());
- PeersAndLearners newConfiguration = nonStableNodeAssignments == null
- ? realConfiguration :
fromAssignments(nonStableNodeAssignments.nodes());
+ PeersAndLearners stablePeersAndLearners =
fromAssignments(stableAssignments.nodes());
TablePartitionId replicaGrpId = new TablePartitionId(tableId, partId);
- var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
- new HybridTimestamp(1, 0)
- );
- var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
-
- PartitionStorages partitionStorages = getPartitionStorages(table,
partId);
-
- PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
- internalTbl, partId);
-
- storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(),
null);
-
- PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
- partId,
- partitionDataStorage,
- table,
- safeTimeTracker,
- storageUpdateConfig
- );
-
- boolean shouldStartRaftListeners =
shouldStartRaftListeners(assignments, nonStableNodeAssignments);
-
- if (shouldStartRaftListeners) {
- ((InternalTableImpl) internalTbl).updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
-
- mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
- }
-
- // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22315
- Supplier<RaftGroupService> getCachedRaftClient = () -> {
- try {
- // Return existing service if it's already started.
- return internalTbl
- .tableRaftService()
- .partitionRaftGroupService(replicaGrpId.partitionId());
- } catch (IgniteInternalException e) {
- // We use "IgniteInternalException" in accordance with the
javadoc of "partitionRaftGroupService" method.
- return null;
- }
- };
-
// TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22218
- Consumer<RaftGroupService> updateTableRaftService = (raftClient) ->
((InternalTableImpl) internalTbl)
+ Consumer<RaftGroupService> updateTableRaftService = raftClient ->
internalTbl
.tableRaftService()
.updateInternalTableRaftGroupService(partId, raftClient);
@@ -954,14 +906,12 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
?
partitionReplicatorNodeRecovery.initiateGroupReentryIfNeeded(
replicaGrpId,
internalTbl,
- newConfiguration,
+ stablePeersAndLearners,
localMemberAssignment
)
: trueCompletedFuture();
- Assignments forcedAssignments = nonStableNodeAssignments != null
&& nonStableNodeAssignments.force()
- ? nonStableNodeAssignments
- : null;
+ Assignments forcedAssignments = stableAssignments.force() ?
stableAssignments : null;
startGroupFut = replicaMgr.weakStartReplica(
replicaGrpId,
@@ -972,7 +922,28 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
// (2) Otherwise let's start replica manually
- InternalTable internalTable = table.internalTable();
+ var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp,
Void>(HybridTimestamp.MIN_VALUE);
+
+ var storageIndexTracker = new
PendingComparableValuesTracker<Long, Void>(0L);
+
+ PartitionStorages partitionStorages =
getPartitionStorages(table, partId);
+
+ PartitionDataStorage partitionDataStorage =
partitionDataStorage(partitionStorages.getMvPartitionStorage(),
+ internalTbl, partId);
+
+
storageIndexTracker.update(partitionDataStorage.lastAppliedIndex(), null);
+
+ PartitionUpdateHandlers partitionUpdateHandlers =
createPartitionUpdateHandlers(
+ partId,
+ partitionDataStorage,
+ table,
+ safeTimeTracker,
+ storageUpdateConfig
+ );
+
+ internalTbl.updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
+
+ mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
RaftGroupListener raftGroupListener = new
PartitionListener(
txManager,
@@ -988,7 +959,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
);
SnapshotStorageFactory snapshotStorageFactory =
createSnapshotStorageFactory(replicaGrpId,
- partitionUpdateHandlers, internalTable);
+ partitionUpdateHandlers, internalTbl);
Function<RaftGroupService, ReplicaListener>
createListener = (raftClient) -> createReplicaListener(
replicaGrpId,
@@ -1001,10 +972,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
RaftGroupEventsListener raftGroupEventsListener =
createRaftGroupEventsListener(zoneId, replicaGrpId);
- MvTableStorage mvTableStorage =
internalTable.storage();
+ MvTableStorage mvTableStorage = internalTbl.storage();
try {
- var ret = replicaMgr.startReplica(
+ return replicaMgr.startReplica(
raftGroupEventsListener,
raftGroupListener,
mvTableStorage.isVolatile(),
@@ -1013,8 +984,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
createListener,
storageIndexTracker,
replicaGrpId,
- newConfiguration);
- return ret;
+ stablePeersAndLearners);
} catch (NodeStoppingException e) {
throw new AssertionError("Loza was stopped before
Table manager", e);
}
@@ -1027,17 +997,29 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
startGroupFut = falseCompletedFuture();
}
- startGroupFut
+ return startGroupFut
// TODO: the stage will be removed after
https://issues.apache.org/jira/browse/IGNITE-22315
.thenComposeAsync(isReplicaStarted -> inBusyLock(busyLock, ()
-> {
if (isReplicaStarted) {
return nullCompletedFuture();
}
+ // TODO: will be removed in
https://issues.apache.org/jira/browse/IGNITE-22315
+ Supplier<RaftGroupService> getCachedRaftClient = () -> {
+ try {
+ // Return existing service if it's already started.
+ return internalTbl
+ .tableRaftService()
+
.partitionRaftGroupService(replicaGrpId.partitionId());
+ } catch (IgniteInternalException e) {
+ // We use "IgniteInternalException" in accordance
with the javadoc of "partitionRaftGroupService" method.
+ return null;
+ }
+ };
+
CompletableFuture<TopologyAwareRaftGroupService>
newRaftClientFut;
try {
- newRaftClientFut = replicaMgr.startRaftClient(
- replicaGrpId, newConfiguration,
getCachedRaftClient);
+ newRaftClientFut =
replicaMgr.startRaftClient(replicaGrpId, stablePeersAndLearners,
getCachedRaftClient);
} catch (NodeStoppingException e) {
throw new CompletionException(e);
}
@@ -1046,23 +1028,15 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.whenComplete((res, ex) -> {
if (ex != null) {
LOG.warn("Unable to update raft groups on the node
[tableId={}, partitionId={}]", ex, tableId, partId);
-
- resultFuture.completeExceptionally(ex);
- } else {
- resultFuture.complete(null);
}
});
-
- return resultFuture;
}
- private boolean shouldStartRaftListeners(Assignments assignments,
@Nullable Assignments nonStableNodeAssignments) {
- Set<Assignment> nodesForStarting = nonStableNodeAssignments == null
- ? assignments.nodes()
- : RebalanceUtil.subtract(nonStableNodeAssignments.nodes(),
assignments.nodes());
- return nodesForStarting
- .stream()
- .anyMatch(assignment ->
assignment.consistentId().equals(localNode().name()));
+ @Nullable
+ private Assignment localMemberAssignment(Assignments assignments) {
+ Assignment localMemberAssignment =
Assignment.forPeer(localNode().name());
+
+ return assignments.nodes().contains(localMemberAssignment) ?
localMemberAssignment : null;
}
private PartitionMover createPartitionMover(TablePartitionId replicaGrpId)
{
@@ -1813,16 +1787,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
return
setTablesPartitionCountersForRebalance(replicaGrpId, revision,
pendingAssignments.force())
- .thenCompose(r ->
- handleChangePendingAssignmentEvent(
- replicaGrpId,
- table,
- pendingAssignments,
- stableAssignments == null ?
emptySet() : stableAssignments.nodes(),
- revision,
- isRecovery
- )
- )
+ .thenCompose(r ->
handleChangePendingAssignmentEvent(
+ replicaGrpId,
+ table,
+ stableAssignments,
+ pendingAssignments,
+ revision,
+ isRecovery
+ ))
.thenCompose(v ->
changePeersOnRebalance(table, replicaGrpId, pendingAssignments.nodes(),
revision));
} finally {
busyLock.leaveBusy();
@@ -1834,45 +1806,45 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private CompletableFuture<Void> handleChangePendingAssignmentEvent(
TablePartitionId replicaGrpId,
TableImpl tbl,
+ @Nullable Assignments stableAssignments,
Assignments pendingAssignments,
- Set<Assignment> stableAssignments,
long revision,
boolean isRecovery
) {
- ClusterNode localMember = localNode();
-
boolean pendingAssignmentsAreForced = pendingAssignments.force();
Set<Assignment> pendingAssignmentsNodes = pendingAssignments.nodes();
// Start a new Raft node and Replica if this node has appeared in the
new assignments.
- boolean shouldStartLocalGroupNode = pendingAssignmentsNodes.stream()
- .filter(assignment ->
localMember.name().equals(assignment.consistentId()))
- .anyMatch(assignment ->
!stableAssignments.contains(assignment));
-
- CompletableFuture<Void> localServicesStartFuture;
-
- int tableId = tbl.tableId();
+ Assignment localMemberAssignment =
localMemberAssignment(pendingAssignments);
- int zoneId = getTableDescriptor(tableId,
catalogService.latestCatalogVersion()).zoneId();
+ boolean shouldStartLocalGroupNode = localMemberAssignment != null
+ && (stableAssignments == null ||
!stableAssignments.nodes().contains(localMemberAssignment));
// This is a set of assignments for nodes that are not the part of
stable assignments, i.e. unstable part of the distribution.
// For regular pending assignments we use (old) stable set, so that
none of new nodes would be able to propose itself as a leader.
// For forced assignments, we should do the same thing, but only for
the subset of stable set that is alive right now. Dead nodes
// are excluded. It is calculated precisely as an intersection between
forced assignments and (old) stable assignments.
- Assignments nonStableNodeAssignments = pendingAssignmentsAreForced
- ? Assignments.forced(intersect(stableAssignments,
pendingAssignmentsNodes))
- : Assignments.of(stableAssignments);
-
- // This condition can only pass if all stable nodes are dead, and we
start new raft group from scratch.
- // In this case new initial configuration must match new forced
assignments.
- if (nonStableNodeAssignments.nodes().isEmpty()) {
- nonStableNodeAssignments =
Assignments.forced(pendingAssignmentsNodes);
+ Assignments computedStableAssignments;
+
+ if (stableAssignments == null) {
+ // This condition can only pass if all stable nodes are dead, and
we start new raft group from scratch.
+ // In this case new initial configuration must match new forced
assignments.
+ computedStableAssignments =
Assignments.forced(pendingAssignmentsNodes);
+ } else if (pendingAssignmentsAreForced) {
+ // In case of forced assignments we need to remove nodes that are
present in the stable set but are missing from the
+ // pending set. Such operation removes dead stable nodes from the
resulting stable set, which guarantees that we will
+ // have a live majority.
+ Set<Assignment> intersection =
intersect(stableAssignments.nodes(), pendingAssignmentsNodes);
+
+ computedStableAssignments = intersection.isEmpty() ?
pendingAssignments : Assignments.forced(intersection);
+ } else {
+ computedStableAssignments = stableAssignments;
}
- Assignments nonStableNodeAssignmentsFinal = nonStableNodeAssignments;
-
int partitionId = replicaGrpId.partitionId();
+ CompletableFuture<Void> localServicesStartFuture;
+
if (shouldStartLocalGroupNode) {
PartitionSet singlePartitionIdSet = PartitionSet.of(partitionId);
@@ -1893,11 +1865,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
);
}
+ int zoneId = getTableDescriptor(tbl.tableId(),
catalogService.latestCatalogVersion()).zoneId();
+
return startPartitionAndStartClient(
tbl,
replicaGrpId.partitionId(),
- pendingAssignments,
- nonStableNodeAssignmentsFinal,
+ localMemberAssignment,
+ computedStableAssignments,
zoneId,
isRecovery
);
@@ -1905,7 +1879,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
} else {
localServicesStartFuture = runAsync(() -> {
if (pendingAssignmentsAreForced &&
replicaMgr.isReplicaStarted(replicaGrpId)) {
- replicaMgr.resetPeers(replicaGrpId,
fromAssignments(nonStableNodeAssignmentsFinal.nodes()));
+ replicaMgr.resetPeers(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()));
}
}, ioExecutor);
}
@@ -1913,14 +1887,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return localServicesStartFuture.thenRunAsync(() -> {
// For forced assignments, we exclude dead stable nodes, and all
alive stable nodes are already in pending assignments.
// Union is not required in such a case.
- Set<Assignment> cfg = pendingAssignmentsAreForced
+ Set<Assignment> newAssignments = pendingAssignmentsAreForced ||
stableAssignments == null
? pendingAssignmentsNodes
- : union(pendingAssignmentsNodes, stableAssignments);
+ : union(pendingAssignmentsNodes,
stableAssignments.nodes());
tbl.internalTable()
.tableRaftService()
.partitionRaftGroupService(partitionId)
- .updateConfiguration(fromAssignments(cfg));
+ .updateConfiguration(fromAssignments(newAssignments));
}, ioExecutor);
}
@@ -2570,7 +2544,14 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int zoneId = getTableDescriptor(tablePartitionId.tableId(),
catalogService.latestCatalogVersion()).zoneId();
- return startPartitionAndStartClient(table,
tablePartitionId.partitionId(), stableAssignments, null, zoneId, false);
+ return startPartitionAndStartClient(
+ table,
+ tablePartitionId.partitionId(),
+ localMemberAssignment(stableAssignments),
+ stableAssignments,
+ zoneId,
+ false
+ );
}, ioExecutor);
}), ioExecutor));
}