This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 19f11cb1bb7 IGNITE-17592 Fix synchronous await on futures in raft
group events rebalance listener (#5824)
19f11cb1bb7 is described below
commit 19f11cb1bb7478011344e9a7fad8c94f153ae763
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed May 21 17:28:44 2025 +0400
IGNITE-17592 Fix synchronous await on futures in raft group events
rebalance listener (#5824)
---
.../RebalanceRaftGroupEventsListener.java | 408 ++++++++++----------
.../ZoneRebalanceRaftGroupEventsListener.java | 412 +++++++++++----------
.../PartitionReplicaLifecycleManager.java | 1 -
3 files changed, 436 insertions(+), 385 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index ca70ced520f..c9d50bdd76c 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -33,12 +33,11 @@ import static
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CollectionUtils.intersect;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -181,19 +180,19 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
}
try {
- rebalanceScheduler.schedule(() -> {
+ Set<Assignment> stable = createAssignments(configuration);
+
+ rebalanceScheduler.execute(() -> {
if (!busyLock.enterBusy()) {
return;
}
try {
- Set<Assignment> stable = createAssignments(configuration);
-
- doStableKeySwitch(stable, tablePartitionId,
metaStorageMgr, term, index, calculateAssignmentsFn);
+ doStableKeySwitchWithExceptionHandling(stable,
tablePartitionId, term, index, calculateAssignmentsFn);
} finally {
busyLock.leaveBusy();
}
- }, 0, TimeUnit.MILLISECONDS);
+ });
} finally {
busyLock.leaveBusy();
}
@@ -273,34 +272,56 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
/**
* Updates stable value with the new applied assignment.
*/
- private void doStableKeySwitch(
+ private void doStableKeySwitchWithExceptionHandling(
Set<Assignment> stableFromRaft,
TablePartitionId tablePartitionId,
- MetaStorageManager metaStorageMgr,
long configurationTerm,
long configurationIndex,
BiFunction<TablePartitionId, Long,
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
) {
- try {
- ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsQueueKey(tablePartitionId);
- ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(tablePartitionId);
- ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(tablePartitionId);
- ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
- ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
- ByteArray assignmentsChainKey =
assignmentsChainKey(tablePartitionId);
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
- Map<ByteArray, Entry> values = metaStorageMgr.getAll(
- Set.of(
- plannedPartAssignmentsKey,
- pendingPartAssignmentsKey,
- stablePartAssignmentsKey,
- switchReduceKey,
- switchAppendKey,
- assignmentsChainKey
- )
- ).get();
+ doStableKeySwitch(
+ stableFromRaft,
+ tablePartitionId,
+ configurationTerm,
+ configurationIndex,
+ calculateAssignmentsFn
+ ).whenComplete((res, ex) -> {
+ // TODO: IGNITE-14693
+ if (ex != null && !hasCause(ex, NodeStoppingException.class)) {
+ if (hasCause(ex, TimeoutException.class)) {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout
properly.
+ LOG.error("Unable to commit partition configuration to
metastore: {}", ex, tablePartitionId);
+ } else {
+ String errorMessage = String.format("Unable to commit
partition configuration to metastore: %s", tablePartitionId);
+ failureProcessor.process(new FailureContext(ex,
errorMessage));
+ }
+ }
+ });
+ }
+ private CompletableFuture<Void> doStableKeySwitch(
+ Set<Assignment> stableFromRaft,
+ TablePartitionId tablePartitionId,
+ long configurationTerm,
+ long configurationIndex,
+ BiFunction<TablePartitionId, Long,
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
+ ) {
+ ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsQueueKey(tablePartitionId);
+ ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(tablePartitionId);
+ ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(tablePartitionId);
+ ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
+ ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
+ ByteArray assignmentsChainKey = assignmentsChainKey(tablePartitionId);
+
+ Set<ByteArray> keysToGet = Set.of(
+ plannedPartAssignmentsKey,
+ pendingPartAssignmentsKey,
+ stablePartAssignmentsKey,
+ switchReduceKey,
+ switchAppendKey,
+ assignmentsChainKey
+ );
+ return metaStorageMgr.getAll(keysToGet).thenCompose(values -> {
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
@@ -318,201 +339,210 @@ public class RebalanceRaftGroupEventsListener
implements RaftGroupEventsListener
Set<Assignment> retrievedPending = pendingAssignments.nodes();
if (!retrievedPending.equals(stableFromRaft)) {
- return;
+ return nullCompletedFuture();
}
// We wait for catalog metadata to be applied up to the provided
timestamp, so it should be safe to use the timestamp.
- Set<Assignment> calculatedAssignments =
calculateAssignmentsFn.apply(tablePartitionId, pendingAssignments.timestamp())
- .get();
+ return calculateAssignmentsFn.apply(tablePartitionId,
pendingAssignments.timestamp()).thenCompose(calculatedAssignments -> {
+ // Were reduced
+ Set<Assignment> reducedNodes =
difference(retrievedSwitchReduce, stableFromRaft);
- // Were reduced
- Set<Assignment> reducedNodes = difference(retrievedSwitchReduce,
stableFromRaft);
+ // Were added
+ Set<Assignment> addedNodes = difference(stableFromRaft,
retrievedStable);
- // Were added
- Set<Assignment> addedNodes = difference(stableFromRaft,
retrievedStable);
+ // For further reduction
+ Set<Assignment> calculatedSwitchReduce =
difference(retrievedSwitchReduce, reducedNodes);
- // For further reduction
- Set<Assignment> calculatedSwitchReduce =
difference(retrievedSwitchReduce, reducedNodes);
+ // For further addition
+ Set<Assignment> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
+ calculatedSwitchAppend = difference(calculatedSwitchAppend,
addedNodes);
+ calculatedSwitchAppend = intersect(calculatedAssignments,
calculatedSwitchAppend);
- // For further addition
- Set<Assignment> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
- calculatedSwitchAppend = difference(calculatedSwitchAppend,
addedNodes);
- calculatedSwitchAppend = intersect(calculatedAssignments,
calculatedSwitchAppend);
+ Set<Assignment> calculatedPendingReduction =
difference(stableFromRaft, retrievedSwitchReduce);
- Set<Assignment> calculatedPendingReduction =
difference(stableFromRaft, retrievedSwitchReduce);
+ Set<Assignment> calculatedPendingAddition =
union(stableFromRaft, reducedNodes);
+ calculatedPendingAddition = intersect(calculatedAssignments,
calculatedPendingAddition);
- Set<Assignment> calculatedPendingAddition = union(stableFromRaft,
reducedNodes);
- calculatedPendingAddition = intersect(calculatedAssignments,
calculatedPendingAddition);
+ // eq(revision(assignments.stable),
retrievedAssignmentsStable.revision)
+ SimpleCondition con1 = stableEntry.empty()
+ ? notExists(stablePartAssignmentsKey) :
+
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
- // eq(revision(assignments.stable),
retrievedAssignmentsStable.revision)
- SimpleCondition con1 = stableEntry.empty()
- ? notExists(stablePartAssignmentsKey) :
-
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+ // eq(revision(assignments.pending),
retrievedAssignmentsPending.revision)
+ SimpleCondition con2 =
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
- // eq(revision(assignments.pending),
retrievedAssignmentsPending.revision)
- SimpleCondition con2 =
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
+ // eq(revision(assignments.switch.reduce),
retrievedAssignmentsSwitchReduce.revision)
+ SimpleCondition con3 = switchReduceEntry.empty()
+ ? notExists(switchReduceKey) :
revision(switchReduceKey).eq(switchReduceEntry.revision());
- // eq(revision(assignments.switch.reduce),
retrievedAssignmentsSwitchReduce.revision)
- SimpleCondition con3 = switchReduceEntry.empty()
- ? notExists(switchReduceKey) :
revision(switchReduceKey).eq(switchReduceEntry.revision());
+ // eq(revision(assignments.switch.append),
retrievedAssignmentsSwitchAppend.revision)
+ SimpleCondition con4 = switchAppendEntry.empty()
+ ? notExists(switchAppendKey) :
revision(switchAppendKey).eq(switchAppendEntry.revision());
- // eq(revision(assignments.switch.append),
retrievedAssignmentsSwitchAppend.revision)
- SimpleCondition con4 = switchAppendEntry.empty()
- ? notExists(switchAppendKey) :
revision(switchAppendKey).eq(switchAppendEntry.revision());
+ // All conditions combined with AND operator.
+ Condition retryPreconditions = and(con1, and(con2, and(con3,
con4)));
- // All conditions combined with AND operator.
- Condition retryPreconditions = and(con1, and(con2, and(con3,
con4)));
+ long catalogTimestamp = pendingAssignments.timestamp();
- long catalogTimestamp = pendingAssignments.timestamp();
+ Assignments newStableAssignments =
Assignments.of(stableFromRaft, catalogTimestamp);
- Assignments newStableAssignments = Assignments.of(stableFromRaft,
catalogTimestamp);
+ Operation assignmentChainChangeOp =
handleAssignmentsChainChange(
+ assignmentsChainKey,
+ assignmentsChainEntry,
+ pendingAssignments,
+ newStableAssignments,
+ configurationTerm,
+ configurationIndex
+ );
- Operation assignmentChainChangeOp = handleAssignmentsChainChange(
- assignmentsChainKey,
- assignmentsChainEntry,
- pendingAssignments,
- newStableAssignments,
- configurationTerm,
- configurationIndex
- );
+ Update successCase;
+ Update failCase;
- Update successCase;
- Update failCase;
-
- byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
- byte[] additionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition,
catalogTimestamp));
- byte[] reductionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction,
catalogTimestamp));
- byte[] switchReduceByteArray =
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
- byte[] switchAppendByteArray =
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
-
- if (!calculatedSwitchAppend.isEmpty()) {
- successCase = ops(
- put(stablePartAssignmentsKey, stableFromRaftByteArray),
- put(pendingPartAssignmentsKey, additionByteArray),
- put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray),
- assignmentChainChangeOp
- ).yield(SWITCH_APPEND_SUCCESS);
- failCase = ops().yield(SWITCH_APPEND_FAIL);
- } else if (!calculatedSwitchReduce.isEmpty()) {
- successCase = ops(
- put(stablePartAssignmentsKey, stableFromRaftByteArray),
- put(pendingPartAssignmentsKey, reductionByteArray),
- put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray),
- assignmentChainChangeOp
- ).yield(SWITCH_REDUCE_SUCCESS);
- failCase = ops().yield(SWITCH_REDUCE_FAIL);
- } else {
- Condition con5;
- if (plannedEntry.value() != null) {
- // eq(revision(partition.assignments.planned),
plannedEntry.revision)
- con5 =
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+ byte[] stableFromRaftByteArray =
newStableAssignments.toBytes();
+ byte[] additionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition,
catalogTimestamp));
+ byte[] reductionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction,
catalogTimestamp));
+ byte[] switchReduceByteArray =
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
+ byte[] switchAppendByteArray =
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
+ if (!calculatedSwitchAppend.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
- put(pendingPartAssignmentsKey,
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
- remove(plannedPartAssignmentsKey),
+ put(pendingPartAssignmentsKey, additionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray),
assignmentChainChangeOp
- ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
-
- failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
- } else {
- // notExists(partition.assignments.planned)
- con5 = notExists(plannedPartAssignmentsKey);
-
+ ).yield(SWITCH_APPEND_SUCCESS);
+ failCase = ops().yield(SWITCH_APPEND_FAIL);
+ } else if (!calculatedSwitchReduce.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
- remove(pendingPartAssignmentsKey),
+ put(pendingPartAssignmentsKey, reductionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray),
assignmentChainChangeOp
- ).yield(FINISH_REBALANCE_SUCCESS);
-
- failCase = ops().yield(FINISH_REBALANCE_FAIL);
+ ).yield(SWITCH_REDUCE_SUCCESS);
+ failCase = ops().yield(SWITCH_REDUCE_FAIL);
+ } else {
+ Condition con5;
+ if (plannedEntry.value() != null) {
+ // eq(revision(partition.assignments.planned),
plannedEntry.revision)
+ con5 =
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+ successCase = ops(
+ put(stablePartAssignmentsKey,
stableFromRaftByteArray),
+ put(pendingPartAssignmentsKey,
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
+ remove(plannedPartAssignmentsKey),
+ assignmentChainChangeOp
+ ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+ failCase =
ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+ } else {
+ // notExists(partition.assignments.planned)
+ con5 = notExists(plannedPartAssignmentsKey);
+
+ successCase = ops(
+ put(stablePartAssignmentsKey,
stableFromRaftByteArray),
+ remove(pendingPartAssignmentsKey),
+ assignmentChainChangeOp
+ ).yield(FINISH_REBALANCE_SUCCESS);
+
+ failCase = ops().yield(FINISH_REBALANCE_FAIL);
+ }
+
+ retryPreconditions = and(retryPreconditions, con5);
}
- retryPreconditions = and(retryPreconditions, con5);
- }
+ Set<Assignment> finalCalculatedPendingAddition =
calculatedPendingAddition;
+ return metaStorageMgr.invoke(iif(retryPreconditions,
successCase, failCase)).thenCompose(statementResult -> {
+ int res = statementResult.getAsInt();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
- int res = metaStorageMgr.invoke(iif(retryPreconditions,
successCase, failCase)).get().getAsInt();
+ if (res < 0) {
+ logSwitchFailure(res, stableFromRaft,
tablePartitionId);
- if (res < 0) {
- switch (res) {
- case SWITCH_APPEND_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance pending addition information. "
- + "Going to retry
[tablePartitionID={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
- );
- break;
- case SWITCH_REDUCE_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance pending reduce information. "
- + "Going to retry
[tablePartitionID={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
+ return doStableKeySwitch(
+ stableFromRaft,
+ tablePartitionId,
+ configurationTerm,
+ configurationIndex,
+ calculateAssignmentsFn
);
- break;
- case SCHEDULE_PENDING_REBALANCE_FAIL:
- case FINISH_REBALANCE_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance information. "
- + "Going to retry
[tablePartitionId={}, appliedPeers={}]",
- tablePartitionId, stableFromRaft
+ } else {
+ logSwitchSuccess(
+ res,
+ stableFromRaft,
+ tablePartitionId,
+ finalCalculatedPendingAddition,
+ calculatedPendingReduction,
+ plannedEntry
);
- break;
- default:
- assert false : res;
- break;
- }
+ return nullCompletedFuture();
+ }
+ });
+ });
+ });
+ }
- doStableKeySwitch(
- stableFromRaft,
- tablePartitionId,
- metaStorageMgr,
- configurationTerm,
- configurationIndex,
- calculateAssignmentsFn
+ private static void logSwitchFailure(int res, Set<Assignment>
stableFromRaft, TablePartitionId tablePartitionId) {
+ switch (res) {
+ case SWITCH_APPEND_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance pending addition information. "
+ + "Going to retry [tablePartitionID={},
appliedPeers={}]",
+ tablePartitionId, stableFromRaft
);
+ break;
+ case SWITCH_REDUCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance pending reduce information. "
+ + "Going to retry [tablePartitionID={},
appliedPeers={}]",
+ tablePartitionId, stableFromRaft
+ );
+ break;
+ case SCHEDULE_PENDING_REBALANCE_FAIL:
+ case FINISH_REBALANCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance information. "
+ + "Going to retry [tablePartitionId={},
appliedPeers={}]",
+ tablePartitionId, stableFromRaft
+ );
+ break;
+ default:
+ assert false : res;
+ break;
+ }
+ }
- return;
- }
-
- switch (res) {
- case SWITCH_APPEND_SUCCESS:
- LOG.info("Rebalance finished. Going to schedule next
rebalance with addition"
- + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
- tablePartitionId, stableFromRaft,
calculatedPendingAddition
- );
- break;
- case SWITCH_REDUCE_SUCCESS:
- LOG.info("Rebalance finished. Going to schedule next
rebalance with reduction"
- + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
- tablePartitionId, stableFromRaft,
calculatedPendingReduction
- );
- break;
- case SCHEDULE_PENDING_REBALANCE_SUCCESS:
- LOG.info(
- "Rebalance finished. Going to schedule next
rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
- tablePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
- );
- break;
- case FINISH_REBALANCE_SUCCESS:
- LOG.info("Rebalance finished [tablePartitionId={},
appliedPeers={}]", tablePartitionId, stableFromRaft);
- break;
-
- default:
- assert false : res;
- break;
- }
-
- } catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-14693
- if (!hasCause(e, NodeStoppingException.class)) {
- if (hasCause(e, TimeoutException.class)) {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout
properly.
- LOG.error("Unable to commit partition configuration to
metastore: {}", e, tablePartitionId);
- } else {
- String errorMessage = String.format("Unable to commit
partition configuration to metastore: %s", tablePartitionId);
- failureProcessor.process(new FailureContext(e,
errorMessage));
- }
- }
+ private static void logSwitchSuccess(
+ int res,
+ Set<Assignment> stableFromRaft,
+ TablePartitionId tablePartitionId,
+ Set<Assignment> calculatedPendingAddition,
+ Set<Assignment> calculatedPendingReduction,
+ Entry plannedEntry
+ ) {
+ switch (res) {
+ case SWITCH_APPEND_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance
with addition"
+ + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ tablePartitionId, stableFromRaft,
calculatedPendingAddition
+ );
+ break;
+ case SWITCH_REDUCE_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance
with reduction"
+ + " [tablePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ tablePartitionId, stableFromRaft,
calculatedPendingReduction
+ );
+ break;
+ case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+ LOG.info(
+ "Rebalance finished. Going to schedule next rebalance
[tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
+ tablePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
+ );
+ break;
+ case FINISH_REBALANCE_SUCCESS:
+ LOG.info("Rebalance finished [tablePartitionId={},
appliedPeers={}]", tablePartitionId, stableFromRaft);
+ break;
+
+ default:
+ assert false : res;
+ break;
}
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index c95314c8705..0dc5aeffece 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -41,10 +41,8 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -241,26 +239,19 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
}
try {
- rebalanceScheduler.schedule(() -> {
+ Set<Assignment> stable = createAssignments(configuration);
+
+ rebalanceScheduler.execute(() -> {
if (!busyLock.enterBusy()) {
return;
}
try {
- Set<Assignment> stable = createAssignments(configuration);
-
- doStableKeySwitch(
- stable,
- zonePartitionId,
- metaStorageMgr,
- term,
- index,
- calculateAssignmentsFn
- );
+ doStableKeySwitchWithExceptionHandling(stable,
zonePartitionId, term, index, calculateAssignmentsFn);
} finally {
busyLock.leaveBusy();
}
- }, 0, TimeUnit.MILLISECONDS);
+ });
} finally {
busyLock.leaveBusy();
}
@@ -340,34 +331,56 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
/**
* Updates stable value with the new applied assignment.
*/
- private void doStableKeySwitch(
+ private void doStableKeySwitchWithExceptionHandling(
Set<Assignment> stableFromRaft,
ZonePartitionId zonePartitionId,
- MetaStorageManager metaStorageMgr,
long configurationTerm,
long configurationIndex,
BiFunction<ZonePartitionId, Long,
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
) {
- try {
- ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsQueueKey(zonePartitionId);
- ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(zonePartitionId);
- ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(zonePartitionId);
- ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
- ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
- ByteArray assignmentsChainKey =
assignmentsChainKey(zonePartitionId);
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
- Map<ByteArray, Entry> values = metaStorageMgr.getAll(
- Set.of(
- plannedPartAssignmentsKey,
- pendingPartAssignmentsKey,
- stablePartAssignmentsKey,
- switchReduceKey,
- switchAppendKey,
- assignmentsChainKey
- )
- ).get();
+ doStableKeySwitch(
+ stableFromRaft,
+ zonePartitionId,
+ configurationTerm,
+ configurationIndex,
+ calculateAssignmentsFn
+ ).whenComplete((res, ex) -> {
+ // TODO: IGNITE-14693
+ if (ex != null && !hasCause(ex, NodeStoppingException.class)) {
+ if (hasCause(ex, TimeoutException.class)) {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout
properly.
+ LOG.error("Unable to commit partition configuration to
metastore: {}", ex, zonePartitionId);
+ } else {
+ String errorMessage = String.format("Unable to commit
partition configuration to metastore: %s", zonePartitionId);
+ failureProcessor.process(new FailureContext(ex,
errorMessage));
+ }
+ }
+ });
+ }
+ private CompletableFuture<Void> doStableKeySwitch(
+ Set<Assignment> stableFromRaft,
+ ZonePartitionId zonePartitionId,
+ long configurationTerm,
+ long configurationIndex,
+ BiFunction<ZonePartitionId, Long,
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
+ ) {
+ ByteArray pendingPartAssignmentsKey =
pendingPartAssignmentsQueueKey(zonePartitionId);
+ ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(zonePartitionId);
+ ByteArray plannedPartAssignmentsKey =
plannedPartAssignmentsKey(zonePartitionId);
+ ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
+ ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
+ ByteArray assignmentsChainKey = assignmentsChainKey(zonePartitionId);
+
+ Set<ByteArray> keysToGet = Set.of(
+ plannedPartAssignmentsKey,
+ pendingPartAssignmentsKey,
+ stablePartAssignmentsKey,
+ switchReduceKey,
+ switchAppendKey,
+ assignmentsChainKey
+ );
+ return metaStorageMgr.getAll(keysToGet).thenCompose(values -> {
Entry stableEntry = values.get(stablePartAssignmentsKey);
Entry pendingEntry = values.get(pendingPartAssignmentsKey);
Entry plannedEntry = values.get(plannedPartAssignmentsKey);
@@ -384,201 +397,210 @@ public class ZoneRebalanceRaftGroupEventsListener
implements RaftGroupEventsList
Set<Assignment> retrievedPending = pendingAssignments.nodes();
if (!retrievedPending.equals(stableFromRaft)) {
- return;
+ return nullCompletedFuture();
}
// We wait for catalog metadata to be applied up to the provided
timestamp, so it should be safe to use the timestamp.
- Set<Assignment> calculatedAssignments =
calculateAssignmentsFn.apply(zonePartitionId, pendingAssignments.timestamp())
- .get();
+ return calculateAssignmentsFn.apply(zonePartitionId,
pendingAssignments.timestamp()).thenCompose(calculatedAssignments -> {
+ // Were reduced
+ Set<Assignment> reducedNodes =
difference(retrievedSwitchReduce, stableFromRaft);
- // Were reduced
- Set<Assignment> reducedNodes = difference(retrievedSwitchReduce,
stableFromRaft);
+ // Were added
+ Set<Assignment> addedNodes = difference(stableFromRaft,
retrievedStable);
- // Were added
- Set<Assignment> addedNodes = difference(stableFromRaft,
retrievedStable);
+ // For further reduction
+ Set<Assignment> calculatedSwitchReduce =
difference(retrievedSwitchReduce, reducedNodes);
- // For further reduction
- Set<Assignment> calculatedSwitchReduce =
difference(retrievedSwitchReduce, reducedNodes);
+ // For further addition
+ Set<Assignment> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
+ calculatedSwitchAppend = difference(calculatedSwitchAppend,
addedNodes);
+ calculatedSwitchAppend = intersect(calculatedAssignments,
calculatedSwitchAppend);
- // For further addition
- Set<Assignment> calculatedSwitchAppend =
union(retrievedSwitchAppend, reducedNodes);
- calculatedSwitchAppend = difference(calculatedSwitchAppend,
addedNodes);
- calculatedSwitchAppend = intersect(calculatedAssignments,
calculatedSwitchAppend);
+ Set<Assignment> calculatedPendingReduction =
difference(stableFromRaft, retrievedSwitchReduce);
- Set<Assignment> calculatedPendingReduction =
difference(stableFromRaft, retrievedSwitchReduce);
+ Set<Assignment> calculatedPendingAddition =
union(stableFromRaft, reducedNodes);
+ calculatedPendingAddition = intersect(calculatedAssignments,
calculatedPendingAddition);
- Set<Assignment> calculatedPendingAddition = union(stableFromRaft,
reducedNodes);
- calculatedPendingAddition = intersect(calculatedAssignments,
calculatedPendingAddition);
+ // eq(revision(assignments.stable),
retrievedAssignmentsStable.revision)
+ SimpleCondition con1 = stableEntry.empty()
+ ? notExists(stablePartAssignmentsKey) :
+
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
- // eq(revision(assignments.stable),
retrievedAssignmentsStable.revision)
- SimpleCondition con1 = stableEntry.empty()
- ? notExists(stablePartAssignmentsKey) :
-
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+ // eq(revision(assignments.pending),
retrievedAssignmentsPending.revision)
+ SimpleCondition con2 =
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
- // eq(revision(assignments.pending),
retrievedAssignmentsPending.revision)
- SimpleCondition con2 =
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
+ // eq(revision(assignments.switch.reduce),
retrievedAssignmentsSwitchReduce.revision)
+ SimpleCondition con3 = switchReduceEntry.empty()
+ ? notExists(switchReduceKey) :
revision(switchReduceKey).eq(switchReduceEntry.revision());
- // eq(revision(assignments.switch.reduce),
retrievedAssignmentsSwitchReduce.revision)
- SimpleCondition con3 = switchReduceEntry.empty()
- ? notExists(switchReduceKey) :
revision(switchReduceKey).eq(switchReduceEntry.revision());
+ // eq(revision(assignments.switch.append),
retrievedAssignmentsSwitchAppend.revision)
+ SimpleCondition con4 = switchAppendEntry.empty()
+ ? notExists(switchAppendKey) :
revision(switchAppendKey).eq(switchAppendEntry.revision());
- // eq(revision(assignments.switch.append),
retrievedAssignmentsSwitchAppend.revision)
- SimpleCondition con4 = switchAppendEntry.empty()
- ? notExists(switchAppendKey) :
revision(switchAppendKey).eq(switchAppendEntry.revision());
+ // All conditions combined with AND operator.
+ Condition retryPreconditions = and(con1, and(con2, and(con3,
con4)));
- // All conditions combined with AND operator.
- Condition retryPreconditions = and(con1, and(con2, and(con3,
con4)));
+ long catalogTimestamp = pendingAssignments.timestamp();
- long catalogTimestamp = pendingAssignments.timestamp();
+ Assignments newStableAssignments =
Assignments.of(stableFromRaft, catalogTimestamp);
- Assignments newStableAssignments = Assignments.of(stableFromRaft,
catalogTimestamp);
+ Operation assignmentChainChangeOp =
handleAssignmentsChainChange(
+ assignmentsChainKey,
+ assignmentsChainEntry,
+ pendingAssignments,
+ newStableAssignments,
+ configurationTerm,
+ configurationIndex
+ );
- Operation assignmentChainChangeOp = handleAssignmentsChainChange(
- assignmentsChainKey,
- assignmentsChainEntry,
- pendingAssignments,
- newStableAssignments,
- configurationTerm,
- configurationIndex
- );
+ Update successCase;
+ Update failCase;
- Update successCase;
- Update failCase;
-
- byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
- byte[] additionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition,
catalogTimestamp));
- byte[] reductionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction,
catalogTimestamp));
- byte[] switchReduceByteArray =
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
- byte[] switchAppendByteArray =
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
-
- if (!calculatedSwitchAppend.isEmpty()) {
- successCase = ops(
- put(stablePartAssignmentsKey, stableFromRaftByteArray),
- put(pendingPartAssignmentsKey, additionByteArray),
- put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray),
- assignmentChainChangeOp
- ).yield(SWITCH_APPEND_SUCCESS);
- failCase = ops().yield(SWITCH_APPEND_FAIL);
- } else if (!calculatedSwitchReduce.isEmpty()) {
- successCase = ops(
- put(stablePartAssignmentsKey, stableFromRaftByteArray),
- put(pendingPartAssignmentsKey, reductionByteArray),
- put(switchReduceKey, switchReduceByteArray),
- put(switchAppendKey, switchAppendByteArray),
- assignmentChainChangeOp
- ).yield(SWITCH_REDUCE_SUCCESS);
- failCase = ops().yield(SWITCH_REDUCE_FAIL);
- } else {
- Condition con5;
- if (plannedEntry.value() != null) {
- // eq(revision(partition.assignments.planned),
plannedEntry.revision)
- con5 =
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+ byte[] stableFromRaftByteArray =
newStableAssignments.toBytes();
+ byte[] additionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition,
catalogTimestamp));
+ byte[] reductionByteArray =
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction,
catalogTimestamp));
+ byte[] switchReduceByteArray =
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
+ byte[] switchAppendByteArray =
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
+ if (!calculatedSwitchAppend.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
- put(pendingPartAssignmentsKey,
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
- remove(plannedPartAssignmentsKey),
+ put(pendingPartAssignmentsKey, additionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray),
assignmentChainChangeOp
- ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
-
- failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
- } else {
- // notExists(partition.assignments.planned)
- con5 = notExists(plannedPartAssignmentsKey);
-
+ ).yield(SWITCH_APPEND_SUCCESS);
+ failCase = ops().yield(SWITCH_APPEND_FAIL);
+ } else if (!calculatedSwitchReduce.isEmpty()) {
successCase = ops(
put(stablePartAssignmentsKey,
stableFromRaftByteArray),
- remove(pendingPartAssignmentsKey),
+ put(pendingPartAssignmentsKey, reductionByteArray),
+ put(switchReduceKey, switchReduceByteArray),
+ put(switchAppendKey, switchAppendByteArray),
assignmentChainChangeOp
- ).yield(FINISH_REBALANCE_SUCCESS);
+ ).yield(SWITCH_REDUCE_SUCCESS);
+ failCase = ops().yield(SWITCH_REDUCE_FAIL);
+ } else {
+ Condition con5;
+ if (plannedEntry.value() != null) {
+ // eq(revision(partition.assignments.planned),
plannedEntry.revision)
+ con5 =
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+ successCase = ops(
+ put(stablePartAssignmentsKey,
stableFromRaftByteArray),
+ put(pendingPartAssignmentsKey,
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
+ remove(plannedPartAssignmentsKey),
+ assignmentChainChangeOp
+ ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+ failCase =
ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+ } else {
+ // notExists(partition.assignments.planned)
+ con5 = notExists(plannedPartAssignmentsKey);
+
+ successCase = ops(
+ put(stablePartAssignmentsKey,
stableFromRaftByteArray),
+ remove(pendingPartAssignmentsKey),
+ assignmentChainChangeOp
+ ).yield(FINISH_REBALANCE_SUCCESS);
+
+ failCase = ops().yield(FINISH_REBALANCE_FAIL);
+ }
- failCase = ops().yield(FINISH_REBALANCE_FAIL);
+ retryPreconditions = and(retryPreconditions, con5);
}
- retryPreconditions = and(retryPreconditions, con5);
- }
+ Set<Assignment> finalCalculatedPendingAddition =
calculatedPendingAddition;
+ return metaStorageMgr.invoke(iif(retryPreconditions,
successCase, failCase)).thenCompose(statementResult -> {
+ int res = statementResult.getAsInt();
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove
synchronous wait
- int res = metaStorageMgr.invoke(iif(retryPreconditions,
successCase, failCase)).get().getAsInt();
+ if (res < 0) {
+ logSwitchFailure(res, stableFromRaft, zonePartitionId);
- if (res < 0) {
- switch (res) {
- case SWITCH_APPEND_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance pending addition information. "
- + "Going to retry [zonePartitionId={},
appliedPeers={}]",
- zonePartitionId, stableFromRaft
- );
- break;
- case SWITCH_REDUCE_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance pending reduce information. "
- + "Going to retry [zonePartitionId={},
appliedPeers={}]",
- zonePartitionId, stableFromRaft
+ return doStableKeySwitch(
+ stableFromRaft,
+ zonePartitionId,
+ configurationTerm,
+ configurationIndex,
+ calculateAssignmentsFn
);
- break;
- case SCHEDULE_PENDING_REBALANCE_FAIL:
- case FINISH_REBALANCE_FAIL:
- LOG.info("Rebalance keys changed while trying to
update rebalance information. "
- + "Going to retry [zonePartitionId={},
appliedPeers={}]",
- zonePartitionId, stableFromRaft
+ } else {
+ logSwitchSuccess(
+ res,
+ stableFromRaft,
+ zonePartitionId,
+ finalCalculatedPendingAddition,
+ calculatedPendingReduction,
+ plannedEntry
);
- break;
- default:
- assert false : res;
- break;
- }
+ return nullCompletedFuture();
+ }
+ });
+ });
+ });
+ }
- doStableKeySwitch(
- stableFromRaft,
- zonePartitionId,
- metaStorageMgr,
- configurationTerm,
- configurationIndex,
- calculateAssignmentsFn
+ private static void logSwitchFailure(int res, Set<Assignment>
stableFromRaft, ZonePartitionId zonePartitionId) {
+ switch (res) {
+ case SWITCH_APPEND_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance pending addition information. "
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
);
+ break;
+ case SWITCH_REDUCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance pending reduce information. "
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
+ );
+ break;
+ case SCHEDULE_PENDING_REBALANCE_FAIL:
+ case FINISH_REBALANCE_FAIL:
+ LOG.info("Rebalance keys changed while trying to update
rebalance information. "
+ + "Going to retry [zonePartitionId={},
appliedPeers={}]",
+ zonePartitionId, stableFromRaft
+ );
+ break;
+ default:
+ assert false : res;
+ break;
+ }
+ }
- return;
- }
-
- switch (res) {
- case SWITCH_APPEND_SUCCESS:
- LOG.info("Rebalance finished. Going to schedule next
rebalance with addition"
- + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
- zonePartitionId, stableFromRaft,
calculatedPendingAddition
- );
- break;
- case SWITCH_REDUCE_SUCCESS:
- LOG.info("Rebalance finished. Going to schedule next
rebalance with reduction"
- + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
- zonePartitionId, stableFromRaft,
calculatedPendingReduction
- );
- break;
- case SCHEDULE_PENDING_REBALANCE_SUCCESS:
- LOG.info(
- "Rebalance finished. Going to schedule next
rebalance [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
- zonePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
- );
- break;
- case FINISH_REBALANCE_SUCCESS:
- LOG.info("Rebalance finished [zonePartitionId={},
appliedPeers={}]", zonePartitionId, stableFromRaft);
- break;
-
- default:
- assert false : res;
- break;
- }
-
- } catch (InterruptedException | ExecutionException e) {
- // TODO: IGNITE-14693
- if (!hasCause(e, NodeStoppingException.class)) {
- if (hasCause(e, TimeoutException.class)) {
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout
properly.
- LOG.error("Unable to commit partition configuration to
metastore: {}", e, zonePartitionId);
- } else {
- String errorMessage = String.format("Unable to commit
partition configuration to metastore: %s", zonePartitionId);
- failureProcessor.process(new FailureContext(e,
errorMessage));
- }
- }
+ private static void logSwitchSuccess(
+ int res,
+ Set<Assignment> stableFromRaft,
+ ZonePartitionId zonePartitionId,
+ Set<Assignment> calculatedPendingAddition,
+ Set<Assignment> calculatedPendingReduction,
+ Entry plannedEntry
+ ) {
+ switch (res) {
+ case SWITCH_APPEND_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance
with addition"
+ + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
calculatedPendingAddition
+ );
+ break;
+ case SWITCH_REDUCE_SUCCESS:
+ LOG.info("Rebalance finished. Going to schedule next rebalance
with reduction"
+ + " [zonePartitionId={}, appliedPeers={},
plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
calculatedPendingReduction
+ );
+ break;
+ case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+ LOG.info(
+ "Rebalance finished. Going to schedule next rebalance
[zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
+ zonePartitionId, stableFromRaft,
Assignments.fromBytes(plannedEntry.value()).nodes()
+ );
+ break;
+ case FINISH_REBALANCE_SUCCESS:
+ LOG.info("Rebalance finished [zonePartitionId={},
appliedPeers={}]", zonePartitionId, stableFromRaft);
+ break;
+
+ default:
+ assert false : res;
+ break;
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8ddaae06460..3709f098da1 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -255,7 +255,6 @@ public class PartitionReplicaLifecycleManager extends
/**
* This future completes on {@link #beforeNodeStop()} with {@link
NodeStoppingException} before the {@link #busyLock} is blocked.
*/
- // TODO: https://issues.apache.org/jira/browse/IGNITE-17592
private final CompletableFuture<Void> stopReplicaLifecycleFuture = new
CompletableFuture<>();
/**