This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 19f11cb1bb7 IGNITE-17592 Fix synchronous await on futures in raft 
group events rebalance listener (#5824)
19f11cb1bb7 is described below

commit 19f11cb1bb7478011344e9a7fad8c94f153ae763
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed May 21 17:28:44 2025 +0400

    IGNITE-17592 Fix synchronous await on futures in raft group events 
rebalance listener (#5824)
---
 .../RebalanceRaftGroupEventsListener.java          | 408 ++++++++++----------
 .../ZoneRebalanceRaftGroupEventsListener.java      | 412 +++++++++++----------
 .../PartitionReplicaLifecycleManager.java          |   1 -
 3 files changed, 436 insertions(+), 385 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
index ca70ced520f..c9d50bdd76c 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java
@@ -33,12 +33,11 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static org.apache.ignite.internal.util.CollectionUtils.difference;
 import static org.apache.ignite.internal.util.CollectionUtils.intersect;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -181,19 +180,19 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
         }
 
         try {
-            rebalanceScheduler.schedule(() -> {
+            Set<Assignment> stable = createAssignments(configuration);
+
+            rebalanceScheduler.execute(() -> {
                 if (!busyLock.enterBusy()) {
                     return;
                 }
 
                 try {
-                    Set<Assignment> stable = createAssignments(configuration);
-
-                    doStableKeySwitch(stable, tablePartitionId, 
metaStorageMgr, term, index, calculateAssignmentsFn);
+                    doStableKeySwitchWithExceptionHandling(stable, 
tablePartitionId, term, index, calculateAssignmentsFn);
                 } finally {
                     busyLock.leaveBusy();
                 }
-            }, 0, TimeUnit.MILLISECONDS);
+            });
         } finally {
             busyLock.leaveBusy();
         }
@@ -273,34 +272,56 @@ public class RebalanceRaftGroupEventsListener implements 
RaftGroupEventsListener
     /**
      * Updates stable value with the new applied assignment.
      */
-    private void doStableKeySwitch(
+    private void doStableKeySwitchWithExceptionHandling(
             Set<Assignment> stableFromRaft,
             TablePartitionId tablePartitionId,
-            MetaStorageManager metaStorageMgr,
             long configurationTerm,
             long configurationIndex,
             BiFunction<TablePartitionId, Long, 
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
     ) {
-        try {
-            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsQueueKey(tablePartitionId);
-            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(tablePartitionId);
-            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(tablePartitionId);
-            ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
-            ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
-            ByteArray assignmentsChainKey = 
assignmentsChainKey(tablePartitionId);
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
-            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
-                    Set.of(
-                            plannedPartAssignmentsKey,
-                            pendingPartAssignmentsKey,
-                            stablePartAssignmentsKey,
-                            switchReduceKey,
-                            switchAppendKey,
-                            assignmentsChainKey
-                    )
-            ).get();
+        doStableKeySwitch(
+                stableFromRaft,
+                tablePartitionId,
+                configurationTerm,
+                configurationIndex,
+                calculateAssignmentsFn
+        ).whenComplete((res, ex) -> {
+            // TODO: IGNITE-14693
+            if (ex != null && !hasCause(ex, NodeStoppingException.class)) {
+                if (hasCause(ex, TimeoutException.class)) {
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout 
properly.
+                    LOG.error("Unable to commit partition configuration to 
metastore: {}", ex, tablePartitionId);
+                } else {
+                    String errorMessage = String.format("Unable to commit 
partition configuration to metastore: %s", tablePartitionId);
+                    failureProcessor.process(new FailureContext(ex, 
errorMessage));
+                }
+            }
+        });
+    }
 
+    private CompletableFuture<Void> doStableKeySwitch(
+            Set<Assignment> stableFromRaft,
+            TablePartitionId tablePartitionId,
+            long configurationTerm,
+            long configurationIndex,
+            BiFunction<TablePartitionId, Long, 
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
+    ) {
+        ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsQueueKey(tablePartitionId);
+        ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(tablePartitionId);
+        ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(tablePartitionId);
+        ByteArray switchReduceKey = switchReduceKey(tablePartitionId);
+        ByteArray switchAppendKey = switchAppendKey(tablePartitionId);
+        ByteArray assignmentsChainKey = assignmentsChainKey(tablePartitionId);
+
+        Set<ByteArray> keysToGet = Set.of(
+                plannedPartAssignmentsKey,
+                pendingPartAssignmentsKey,
+                stablePartAssignmentsKey,
+                switchReduceKey,
+                switchAppendKey,
+                assignmentsChainKey
+        );
+        return metaStorageMgr.getAll(keysToGet).thenCompose(values -> {
             Entry stableEntry = values.get(stablePartAssignmentsKey);
             Entry pendingEntry = values.get(pendingPartAssignmentsKey);
             Entry plannedEntry = values.get(plannedPartAssignmentsKey);
@@ -318,201 +339,210 @@ public class RebalanceRaftGroupEventsListener 
implements RaftGroupEventsListener
             Set<Assignment> retrievedPending = pendingAssignments.nodes();
 
             if (!retrievedPending.equals(stableFromRaft)) {
-                return;
+                return nullCompletedFuture();
             }
 
             // We wait for catalog metadata to be applied up to the provided 
timestamp, so it should be safe to use the timestamp.
-            Set<Assignment> calculatedAssignments = 
calculateAssignmentsFn.apply(tablePartitionId, pendingAssignments.timestamp())
-                    .get();
+            return calculateAssignmentsFn.apply(tablePartitionId, 
pendingAssignments.timestamp()).thenCompose(calculatedAssignments -> {
+                // Were reduced
+                Set<Assignment> reducedNodes = 
difference(retrievedSwitchReduce, stableFromRaft);
 
-            // Were reduced
-            Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, 
stableFromRaft);
+                // Were added
+                Set<Assignment> addedNodes = difference(stableFromRaft, 
retrievedStable);
 
-            // Were added
-            Set<Assignment> addedNodes = difference(stableFromRaft, 
retrievedStable);
+                // For further reduction
+                Set<Assignment> calculatedSwitchReduce = 
difference(retrievedSwitchReduce, reducedNodes);
 
-            // For further reduction
-            Set<Assignment> calculatedSwitchReduce = 
difference(retrievedSwitchReduce, reducedNodes);
+                // For further addition
+                Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
+                calculatedSwitchAppend = difference(calculatedSwitchAppend, 
addedNodes);
+                calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
 
-            // For further addition
-            Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
-            calculatedSwitchAppend = difference(calculatedSwitchAppend, 
addedNodes);
-            calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
+                Set<Assignment> calculatedPendingReduction = 
difference(stableFromRaft, retrievedSwitchReduce);
 
-            Set<Assignment> calculatedPendingReduction = 
difference(stableFromRaft, retrievedSwitchReduce);
+                Set<Assignment> calculatedPendingAddition = 
union(stableFromRaft, reducedNodes);
+                calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
 
-            Set<Assignment> calculatedPendingAddition = union(stableFromRaft, 
reducedNodes);
-            calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
+                // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
+                SimpleCondition con1 = stableEntry.empty()
+                        ? notExists(stablePartAssignmentsKey) :
+                        
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
 
-            // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
-            SimpleCondition con1 = stableEntry.empty()
-                    ? notExists(stablePartAssignmentsKey) :
-                    
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+                // eq(revision(assignments.pending), 
retrievedAssignmentsPending.revision)
+                SimpleCondition con2 = 
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
 
-            // eq(revision(assignments.pending), 
retrievedAssignmentsPending.revision)
-            SimpleCondition con2 = 
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
+                // eq(revision(assignments.switch.reduce), 
retrievedAssignmentsSwitchReduce.revision)
+                SimpleCondition con3 = switchReduceEntry.empty()
+                        ? notExists(switchReduceKey) : 
revision(switchReduceKey).eq(switchReduceEntry.revision());
 
-            // eq(revision(assignments.switch.reduce), 
retrievedAssignmentsSwitchReduce.revision)
-            SimpleCondition con3 = switchReduceEntry.empty()
-                    ? notExists(switchReduceKey) : 
revision(switchReduceKey).eq(switchReduceEntry.revision());
+                // eq(revision(assignments.switch.append), 
retrievedAssignmentsSwitchAppend.revision)
+                SimpleCondition con4 = switchAppendEntry.empty()
+                        ? notExists(switchAppendKey) : 
revision(switchAppendKey).eq(switchAppendEntry.revision());
 
-            // eq(revision(assignments.switch.append), 
retrievedAssignmentsSwitchAppend.revision)
-            SimpleCondition con4 = switchAppendEntry.empty()
-                    ? notExists(switchAppendKey) : 
revision(switchAppendKey).eq(switchAppendEntry.revision());
+                // All conditions combined with AND operator.
+                Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
 
-            // All conditions combined with AND operator.
-            Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
+                long catalogTimestamp = pendingAssignments.timestamp();
 
-            long catalogTimestamp = pendingAssignments.timestamp();
+                Assignments newStableAssignments = 
Assignments.of(stableFromRaft, catalogTimestamp);
 
-            Assignments newStableAssignments = Assignments.of(stableFromRaft, 
catalogTimestamp);
+                Operation assignmentChainChangeOp = 
handleAssignmentsChainChange(
+                        assignmentsChainKey,
+                        assignmentsChainEntry,
+                        pendingAssignments,
+                        newStableAssignments,
+                        configurationTerm,
+                        configurationIndex
+                );
 
-            Operation assignmentChainChangeOp = handleAssignmentsChainChange(
-                    assignmentsChainKey,
-                    assignmentsChainEntry,
-                    pendingAssignments,
-                    newStableAssignments,
-                    configurationTerm,
-                    configurationIndex
-            );
+                Update successCase;
+                Update failCase;
 
-            Update successCase;
-            Update failCase;
-
-            byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
-            byte[] additionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, 
catalogTimestamp));
-            byte[] reductionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, 
catalogTimestamp));
-            byte[] switchReduceByteArray = 
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
-            byte[] switchAppendByteArray = 
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
-
-            if (!calculatedSwitchAppend.isEmpty()) {
-                successCase = ops(
-                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
-                        put(pendingPartAssignmentsKey, additionByteArray),
-                        put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray),
-                        assignmentChainChangeOp
-                ).yield(SWITCH_APPEND_SUCCESS);
-                failCase = ops().yield(SWITCH_APPEND_FAIL);
-            } else if (!calculatedSwitchReduce.isEmpty()) {
-                successCase = ops(
-                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
-                        put(pendingPartAssignmentsKey, reductionByteArray),
-                        put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray),
-                        assignmentChainChangeOp
-                ).yield(SWITCH_REDUCE_SUCCESS);
-                failCase = ops().yield(SWITCH_REDUCE_FAIL);
-            } else {
-                Condition con5;
-                if (plannedEntry.value() != null) {
-                    // eq(revision(partition.assignments.planned), 
plannedEntry.revision)
-                    con5 = 
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+                byte[] stableFromRaftByteArray = 
newStableAssignments.toBytes();
+                byte[] additionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, 
catalogTimestamp));
+                byte[] reductionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, 
catalogTimestamp));
+                byte[] switchReduceByteArray = 
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
+                byte[] switchAppendByteArray = 
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
 
+                if (!calculatedSwitchAppend.isEmpty()) {
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
-                            put(pendingPartAssignmentsKey, 
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
-                            remove(plannedPartAssignmentsKey),
+                            put(pendingPartAssignmentsKey, additionByteArray),
+                            put(switchReduceKey, switchReduceByteArray),
+                            put(switchAppendKey, switchAppendByteArray),
                             assignmentChainChangeOp
-                    ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
-
-                    failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
-                } else {
-                    // notExists(partition.assignments.planned)
-                    con5 = notExists(plannedPartAssignmentsKey);
-
+                    ).yield(SWITCH_APPEND_SUCCESS);
+                    failCase = ops().yield(SWITCH_APPEND_FAIL);
+                } else if (!calculatedSwitchReduce.isEmpty()) {
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
-                            remove(pendingPartAssignmentsKey),
+                            put(pendingPartAssignmentsKey, reductionByteArray),
+                            put(switchReduceKey, switchReduceByteArray),
+                            put(switchAppendKey, switchAppendByteArray),
                             assignmentChainChangeOp
-                    ).yield(FINISH_REBALANCE_SUCCESS);
-
-                    failCase = ops().yield(FINISH_REBALANCE_FAIL);
+                    ).yield(SWITCH_REDUCE_SUCCESS);
+                    failCase = ops().yield(SWITCH_REDUCE_FAIL);
+                } else {
+                    Condition con5;
+                    if (plannedEntry.value() != null) {
+                        // eq(revision(partition.assignments.planned), 
plannedEntry.revision)
+                        con5 = 
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+                        successCase = ops(
+                                put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
+                                put(pendingPartAssignmentsKey, 
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
+                                remove(plannedPartAssignmentsKey),
+                                assignmentChainChangeOp
+                        ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+                        failCase = 
ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+                    } else {
+                        // notExists(partition.assignments.planned)
+                        con5 = notExists(plannedPartAssignmentsKey);
+
+                        successCase = ops(
+                                put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
+                                remove(pendingPartAssignmentsKey),
+                                assignmentChainChangeOp
+                        ).yield(FINISH_REBALANCE_SUCCESS);
+
+                        failCase = ops().yield(FINISH_REBALANCE_FAIL);
+                    }
+
+                    retryPreconditions = and(retryPreconditions, con5);
                 }
 
-                retryPreconditions = and(retryPreconditions, con5);
-            }
+                Set<Assignment> finalCalculatedPendingAddition = 
calculatedPendingAddition;
+                return metaStorageMgr.invoke(iif(retryPreconditions, 
successCase, failCase)).thenCompose(statementResult -> {
+                    int res = statementResult.getAsInt();
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
-            int res = metaStorageMgr.invoke(iif(retryPreconditions, 
successCase, failCase)).get().getAsInt();
+                    if (res < 0) {
+                        logSwitchFailure(res, stableFromRaft, 
tablePartitionId);
 
-            if (res < 0) {
-                switch (res) {
-                    case SWITCH_APPEND_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance pending addition information. "
-                                        + "Going to retry 
[tablePartitionID={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
-                        );
-                        break;
-                    case SWITCH_REDUCE_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance pending reduce information. "
-                                        + "Going to retry 
[tablePartitionID={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
+                        return doStableKeySwitch(
+                                stableFromRaft,
+                                tablePartitionId,
+                                configurationTerm,
+                                configurationIndex,
+                                calculateAssignmentsFn
                         );
-                        break;
-                    case SCHEDULE_PENDING_REBALANCE_FAIL:
-                    case FINISH_REBALANCE_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance information. "
-                                        + "Going to retry 
[tablePartitionId={}, appliedPeers={}]",
-                                tablePartitionId, stableFromRaft
+                    } else {
+                        logSwitchSuccess(
+                                res,
+                                stableFromRaft,
+                                tablePartitionId,
+                                finalCalculatedPendingAddition,
+                                calculatedPendingReduction,
+                                plannedEntry
                         );
-                        break;
-                    default:
-                        assert false : res;
-                        break;
-                }
+                        return nullCompletedFuture();
+                    }
+                });
+            });
+        });
+    }
 
-                doStableKeySwitch(
-                        stableFromRaft,
-                        tablePartitionId,
-                        metaStorageMgr,
-                        configurationTerm,
-                        configurationIndex,
-                        calculateAssignmentsFn
+    private static void logSwitchFailure(int res, Set<Assignment> 
stableFromRaft, TablePartitionId tablePartitionId) {
+        switch (res) {
+            case SWITCH_APPEND_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance pending addition information. "
+                                + "Going to retry [tablePartitionID={}, 
appliedPeers={}]",
+                        tablePartitionId, stableFromRaft
                 );
+                break;
+            case SWITCH_REDUCE_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance pending reduce information. "
+                                + "Going to retry [tablePartitionID={}, 
appliedPeers={}]",
+                        tablePartitionId, stableFromRaft
+                );
+                break;
+            case SCHEDULE_PENDING_REBALANCE_FAIL:
+            case FINISH_REBALANCE_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance information. "
+                                + "Going to retry [tablePartitionId={}, 
appliedPeers={}]",
+                        tablePartitionId, stableFromRaft
+                );
+                break;
+            default:
+                assert false : res;
+                break;
+        }
+    }
 
-                return;
-            }
-
-            switch (res) {
-                case SWITCH_APPEND_SUCCESS:
-                    LOG.info("Rebalance finished. Going to schedule next 
rebalance with addition"
-                                    + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
calculatedPendingAddition
-                    );
-                    break;
-                case SWITCH_REDUCE_SUCCESS:
-                    LOG.info("Rebalance finished. Going to schedule next 
rebalance with reduction"
-                                    + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
calculatedPendingReduction
-                    );
-                    break;
-                case SCHEDULE_PENDING_REBALANCE_SUCCESS:
-                    LOG.info(
-                            "Rebalance finished. Going to schedule next 
rebalance [tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
-                            tablePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
-                    );
-                    break;
-                case FINISH_REBALANCE_SUCCESS:
-                    LOG.info("Rebalance finished [tablePartitionId={}, 
appliedPeers={}]", tablePartitionId, stableFromRaft);
-                    break;
-
-                default:
-                    assert false : res;
-                    break;
-            }
-
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            if (!hasCause(e, NodeStoppingException.class)) {
-                if (hasCause(e, TimeoutException.class)) {
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout 
properly.
-                    LOG.error("Unable to commit partition configuration to 
metastore: {}", e, tablePartitionId);
-                } else {
-                    String errorMessage = String.format("Unable to commit 
partition configuration to metastore: %s", tablePartitionId);
-                    failureProcessor.process(new FailureContext(e, 
errorMessage));
-                }
-            }
+    private static void logSwitchSuccess(
+            int res,
+            Set<Assignment> stableFromRaft,
+            TablePartitionId tablePartitionId,
+            Set<Assignment> calculatedPendingAddition,
+            Set<Assignment> calculatedPendingReduction,
+            Entry plannedEntry
+    ) {
+        switch (res) {
+            case SWITCH_APPEND_SUCCESS:
+                LOG.info("Rebalance finished. Going to schedule next rebalance 
with addition"
+                                + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                        tablePartitionId, stableFromRaft, 
calculatedPendingAddition
+                );
+                break;
+            case SWITCH_REDUCE_SUCCESS:
+                LOG.info("Rebalance finished. Going to schedule next rebalance 
with reduction"
+                                + " [tablePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                        tablePartitionId, stableFromRaft, 
calculatedPendingReduction
+                );
+                break;
+            case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+                LOG.info(
+                        "Rebalance finished. Going to schedule next rebalance 
[tablePartitionId={}, appliedPeers={}, plannedPeers={}]",
+                        tablePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
+                );
+                break;
+            case FINISH_REBALANCE_SUCCESS:
+                LOG.info("Rebalance finished [tablePartitionId={}, 
appliedPeers={}]", tablePartitionId, stableFromRaft);
+                break;
+
+            default:
+                assert false : res;
+                break;
         }
     }
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
index c95314c8705..0dc5aeffece 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceRaftGroupEventsListener.java
@@ -41,10 +41,8 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -241,26 +239,19 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
         }
 
         try {
-            rebalanceScheduler.schedule(() -> {
+            Set<Assignment> stable = createAssignments(configuration);
+
+            rebalanceScheduler.execute(() -> {
                 if (!busyLock.enterBusy()) {
                     return;
                 }
 
                 try {
-                    Set<Assignment> stable = createAssignments(configuration);
-
-                    doStableKeySwitch(
-                            stable,
-                            zonePartitionId,
-                            metaStorageMgr,
-                            term,
-                            index,
-                            calculateAssignmentsFn
-                    );
+                    doStableKeySwitchWithExceptionHandling(stable, 
zonePartitionId, term, index, calculateAssignmentsFn);
                 } finally {
                     busyLock.leaveBusy();
                 }
-            }, 0, TimeUnit.MILLISECONDS);
+            });
         } finally {
             busyLock.leaveBusy();
         }
@@ -340,34 +331,56 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
     /**
      * Updates stable value with the new applied assignment.
      */
-    private void doStableKeySwitch(
+    private void doStableKeySwitchWithExceptionHandling(
             Set<Assignment> stableFromRaft,
             ZonePartitionId zonePartitionId,
-            MetaStorageManager metaStorageMgr,
             long configurationTerm,
             long configurationIndex,
             BiFunction<ZonePartitionId, Long, 
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
     ) {
-        try {
-            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsQueueKey(zonePartitionId);
-            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(zonePartitionId);
-            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(zonePartitionId);
-            ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
-            ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
-            ByteArray assignmentsChainKey = 
assignmentsChainKey(zonePartitionId);
-
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
-            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
-                    Set.of(
-                            plannedPartAssignmentsKey,
-                            pendingPartAssignmentsKey,
-                            stablePartAssignmentsKey,
-                            switchReduceKey,
-                            switchAppendKey,
-                            assignmentsChainKey
-                    )
-            ).get();
+        doStableKeySwitch(
+                stableFromRaft,
+                zonePartitionId,
+                configurationTerm,
+                configurationIndex,
+                calculateAssignmentsFn
+        ).whenComplete((res, ex) -> {
+            // TODO: IGNITE-14693
+            if (ex != null && !hasCause(ex, NodeStoppingException.class)) {
+                if (hasCause(ex, TimeoutException.class)) {
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout 
properly.
+                    LOG.error("Unable to commit partition configuration to 
metastore: {}", ex, zonePartitionId);
+                } else {
+                    String errorMessage = String.format("Unable to commit 
partition configuration to metastore: %s", zonePartitionId);
+                    failureProcessor.process(new FailureContext(ex, 
errorMessage));
+                }
+            }
+        });
+    }
 
+    private CompletableFuture<Void> doStableKeySwitch(
+            Set<Assignment> stableFromRaft,
+            ZonePartitionId zonePartitionId,
+            long configurationTerm,
+            long configurationIndex,
+            BiFunction<ZonePartitionId, Long, 
CompletableFuture<Set<Assignment>>> calculateAssignmentsFn
+    ) {
+        ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsQueueKey(zonePartitionId);
+        ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(zonePartitionId);
+        ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(zonePartitionId);
+        ByteArray switchReduceKey = switchReduceKey(zonePartitionId);
+        ByteArray switchAppendKey = switchAppendKey(zonePartitionId);
+        ByteArray assignmentsChainKey = assignmentsChainKey(zonePartitionId);
+
+        Set<ByteArray> keysToGet = Set.of(
+                plannedPartAssignmentsKey,
+                pendingPartAssignmentsKey,
+                stablePartAssignmentsKey,
+                switchReduceKey,
+                switchAppendKey,
+                assignmentsChainKey
+        );
+        return metaStorageMgr.getAll(keysToGet).thenCompose(values -> {
             Entry stableEntry = values.get(stablePartAssignmentsKey);
             Entry pendingEntry = values.get(pendingPartAssignmentsKey);
             Entry plannedEntry = values.get(plannedPartAssignmentsKey);
@@ -384,201 +397,210 @@ public class ZoneRebalanceRaftGroupEventsListener 
implements RaftGroupEventsList
             Set<Assignment> retrievedPending = pendingAssignments.nodes();
 
             if (!retrievedPending.equals(stableFromRaft)) {
-                return;
+                return nullCompletedFuture();
             }
 
             // We wait for catalog metadata to be applied up to the provided 
timestamp, so it should be safe to use the timestamp.
-            Set<Assignment> calculatedAssignments = 
calculateAssignmentsFn.apply(zonePartitionId, pendingAssignments.timestamp())
-                    .get();
+            return calculateAssignmentsFn.apply(zonePartitionId, 
pendingAssignments.timestamp()).thenCompose(calculatedAssignments -> {
+                // Were reduced
+                Set<Assignment> reducedNodes = 
difference(retrievedSwitchReduce, stableFromRaft);
 
-            // Were reduced
-            Set<Assignment> reducedNodes = difference(retrievedSwitchReduce, 
stableFromRaft);
+                // Were added
+                Set<Assignment> addedNodes = difference(stableFromRaft, 
retrievedStable);
 
-            // Were added
-            Set<Assignment> addedNodes = difference(stableFromRaft, 
retrievedStable);
+                // For further reduction
+                Set<Assignment> calculatedSwitchReduce = 
difference(retrievedSwitchReduce, reducedNodes);
 
-            // For further reduction
-            Set<Assignment> calculatedSwitchReduce = 
difference(retrievedSwitchReduce, reducedNodes);
+                // For further addition
+                Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
+                calculatedSwitchAppend = difference(calculatedSwitchAppend, 
addedNodes);
+                calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
 
-            // For further addition
-            Set<Assignment> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
-            calculatedSwitchAppend = difference(calculatedSwitchAppend, 
addedNodes);
-            calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
+                Set<Assignment> calculatedPendingReduction = 
difference(stableFromRaft, retrievedSwitchReduce);
 
-            Set<Assignment> calculatedPendingReduction = 
difference(stableFromRaft, retrievedSwitchReduce);
+                Set<Assignment> calculatedPendingAddition = 
union(stableFromRaft, reducedNodes);
+                calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
 
-            Set<Assignment> calculatedPendingAddition = union(stableFromRaft, 
reducedNodes);
-            calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
+                // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
+                SimpleCondition con1 = stableEntry.empty()
+                        ? notExists(stablePartAssignmentsKey) :
+                        
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
 
-            // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
-            SimpleCondition con1 = stableEntry.empty()
-                    ? notExists(stablePartAssignmentsKey) :
-                    
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+                // eq(revision(assignments.pending), 
retrievedAssignmentsPending.revision)
+                SimpleCondition con2 = 
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
 
-            // eq(revision(assignments.pending), 
retrievedAssignmentsPending.revision)
-            SimpleCondition con2 = 
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
+                // eq(revision(assignments.switch.reduce), 
retrievedAssignmentsSwitchReduce.revision)
+                SimpleCondition con3 = switchReduceEntry.empty()
+                        ? notExists(switchReduceKey) : 
revision(switchReduceKey).eq(switchReduceEntry.revision());
 
-            // eq(revision(assignments.switch.reduce), 
retrievedAssignmentsSwitchReduce.revision)
-            SimpleCondition con3 = switchReduceEntry.empty()
-                    ? notExists(switchReduceKey) : 
revision(switchReduceKey).eq(switchReduceEntry.revision());
+                // eq(revision(assignments.switch.append), 
retrievedAssignmentsSwitchAppend.revision)
+                SimpleCondition con4 = switchAppendEntry.empty()
+                        ? notExists(switchAppendKey) : 
revision(switchAppendKey).eq(switchAppendEntry.revision());
 
-            // eq(revision(assignments.switch.append), 
retrievedAssignmentsSwitchAppend.revision)
-            SimpleCondition con4 = switchAppendEntry.empty()
-                    ? notExists(switchAppendKey) : 
revision(switchAppendKey).eq(switchAppendEntry.revision());
+                // All conditions combined with AND operator.
+                Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
 
-            // All conditions combined with AND operator.
-            Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
+                long catalogTimestamp = pendingAssignments.timestamp();
 
-            long catalogTimestamp = pendingAssignments.timestamp();
+                Assignments newStableAssignments = 
Assignments.of(stableFromRaft, catalogTimestamp);
 
-            Assignments newStableAssignments = Assignments.of(stableFromRaft, 
catalogTimestamp);
+                Operation assignmentChainChangeOp = 
handleAssignmentsChainChange(
+                        assignmentsChainKey,
+                        assignmentsChainEntry,
+                        pendingAssignments,
+                        newStableAssignments,
+                        configurationTerm,
+                        configurationIndex
+                );
 
-            Operation assignmentChainChangeOp = handleAssignmentsChainChange(
-                    assignmentsChainKey,
-                    assignmentsChainEntry,
-                    pendingAssignments,
-                    newStableAssignments,
-                    configurationTerm,
-                    configurationIndex
-            );
+                Update successCase;
+                Update failCase;
 
-            Update successCase;
-            Update failCase;
-
-            byte[] stableFromRaftByteArray = newStableAssignments.toBytes();
-            byte[] additionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, 
catalogTimestamp));
-            byte[] reductionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, 
catalogTimestamp));
-            byte[] switchReduceByteArray = 
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
-            byte[] switchAppendByteArray = 
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
-
-            if (!calculatedSwitchAppend.isEmpty()) {
-                successCase = ops(
-                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
-                        put(pendingPartAssignmentsKey, additionByteArray),
-                        put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray),
-                        assignmentChainChangeOp
-                ).yield(SWITCH_APPEND_SUCCESS);
-                failCase = ops().yield(SWITCH_APPEND_FAIL);
-            } else if (!calculatedSwitchReduce.isEmpty()) {
-                successCase = ops(
-                        put(stablePartAssignmentsKey, stableFromRaftByteArray),
-                        put(pendingPartAssignmentsKey, reductionByteArray),
-                        put(switchReduceKey, switchReduceByteArray),
-                        put(switchAppendKey, switchAppendByteArray),
-                        assignmentChainChangeOp
-                ).yield(SWITCH_REDUCE_SUCCESS);
-                failCase = ops().yield(SWITCH_REDUCE_FAIL);
-            } else {
-                Condition con5;
-                if (plannedEntry.value() != null) {
-                    // eq(revision(partition.assignments.planned), 
plannedEntry.revision)
-                    con5 = 
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+                byte[] stableFromRaftByteArray = 
newStableAssignments.toBytes();
+                byte[] additionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingAddition, 
catalogTimestamp));
+                byte[] reductionByteArray = 
AssignmentsQueue.toBytes(Assignments.of(calculatedPendingReduction, 
catalogTimestamp));
+                byte[] switchReduceByteArray = 
Assignments.toBytes(calculatedSwitchReduce, catalogTimestamp);
+                byte[] switchAppendByteArray = 
Assignments.toBytes(calculatedSwitchAppend, catalogTimestamp);
 
+                if (!calculatedSwitchAppend.isEmpty()) {
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
-                            put(pendingPartAssignmentsKey, 
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
-                            remove(plannedPartAssignmentsKey),
+                            put(pendingPartAssignmentsKey, additionByteArray),
+                            put(switchReduceKey, switchReduceByteArray),
+                            put(switchAppendKey, switchAppendByteArray),
                             assignmentChainChangeOp
-                    ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
-
-                    failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
-                } else {
-                    // notExists(partition.assignments.planned)
-                    con5 = notExists(plannedPartAssignmentsKey);
-
+                    ).yield(SWITCH_APPEND_SUCCESS);
+                    failCase = ops().yield(SWITCH_APPEND_FAIL);
+                } else if (!calculatedSwitchReduce.isEmpty()) {
                     successCase = ops(
                             put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
-                            remove(pendingPartAssignmentsKey),
+                            put(pendingPartAssignmentsKey, reductionByteArray),
+                            put(switchReduceKey, switchReduceByteArray),
+                            put(switchAppendKey, switchAppendByteArray),
                             assignmentChainChangeOp
-                    ).yield(FINISH_REBALANCE_SUCCESS);
+                    ).yield(SWITCH_REDUCE_SUCCESS);
+                    failCase = ops().yield(SWITCH_REDUCE_FAIL);
+                } else {
+                    Condition con5;
+                    if (plannedEntry.value() != null) {
+                        // eq(revision(partition.assignments.planned), 
plannedEntry.revision)
+                        con5 = 
revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+                        successCase = ops(
+                                put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
+                                put(pendingPartAssignmentsKey, 
AssignmentsQueue.toBytes(Assignments.fromBytes(plannedEntry.value()))),
+                                remove(plannedPartAssignmentsKey),
+                                assignmentChainChangeOp
+                        ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+                        failCase = 
ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+                    } else {
+                        // notExists(partition.assignments.planned)
+                        con5 = notExists(plannedPartAssignmentsKey);
+
+                        successCase = ops(
+                                put(stablePartAssignmentsKey, 
stableFromRaftByteArray),
+                                remove(pendingPartAssignmentsKey),
+                                assignmentChainChangeOp
+                        ).yield(FINISH_REBALANCE_SUCCESS);
+
+                        failCase = ops().yield(FINISH_REBALANCE_FAIL);
+                    }
 
-                    failCase = ops().yield(FINISH_REBALANCE_FAIL);
+                    retryPreconditions = and(retryPreconditions, con5);
                 }
 
-                retryPreconditions = and(retryPreconditions, con5);
-            }
+                Set<Assignment> finalCalculatedPendingAddition = 
calculatedPendingAddition;
+                return metaStorageMgr.invoke(iif(retryPreconditions, 
successCase, failCase)).thenCompose(statementResult -> {
+                    int res = statementResult.getAsInt();
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove 
synchronous wait
-            int res = metaStorageMgr.invoke(iif(retryPreconditions, 
successCase, failCase)).get().getAsInt();
+                    if (res < 0) {
+                        logSwitchFailure(res, stableFromRaft, zonePartitionId);
 
-            if (res < 0) {
-                switch (res) {
-                    case SWITCH_APPEND_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance pending addition information. "
-                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
-                                zonePartitionId, stableFromRaft
-                        );
-                        break;
-                    case SWITCH_REDUCE_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance pending reduce information. "
-                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
-                                zonePartitionId, stableFromRaft
+                        return doStableKeySwitch(
+                                stableFromRaft,
+                                zonePartitionId,
+                                configurationTerm,
+                                configurationIndex,
+                                calculateAssignmentsFn
                         );
-                        break;
-                    case SCHEDULE_PENDING_REBALANCE_FAIL:
-                    case FINISH_REBALANCE_FAIL:
-                        LOG.info("Rebalance keys changed while trying to 
update rebalance information. "
-                                        + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
-                                zonePartitionId, stableFromRaft
+                    } else {
+                        logSwitchSuccess(
+                                res,
+                                stableFromRaft,
+                                zonePartitionId,
+                                finalCalculatedPendingAddition,
+                                calculatedPendingReduction,
+                                plannedEntry
                         );
-                        break;
-                    default:
-                        assert false : res;
-                        break;
-                }
+                        return nullCompletedFuture();
+                    }
+                });
+            });
+        });
+    }
 
-                doStableKeySwitch(
-                        stableFromRaft,
-                        zonePartitionId,
-                        metaStorageMgr,
-                        configurationTerm,
-                        configurationIndex,
-                        calculateAssignmentsFn
+    private static void logSwitchFailure(int res, Set<Assignment> 
stableFromRaft, ZonePartitionId zonePartitionId) {
+        switch (res) {
+            case SWITCH_APPEND_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance pending addition information. "
+                                + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                        zonePartitionId, stableFromRaft
                 );
+                break;
+            case SWITCH_REDUCE_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance pending reduce information. "
+                                + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                        zonePartitionId, stableFromRaft
+                );
+                break;
+            case SCHEDULE_PENDING_REBALANCE_FAIL:
+            case FINISH_REBALANCE_FAIL:
+                LOG.info("Rebalance keys changed while trying to update 
rebalance information. "
+                                + "Going to retry [zonePartitionId={}, 
appliedPeers={}]",
+                        zonePartitionId, stableFromRaft
+                );
+                break;
+            default:
+                assert false : res;
+                break;
+        }
+    }
 
-                return;
-            }
-
-            switch (res) {
-                case SWITCH_APPEND_SUCCESS:
-                    LOG.info("Rebalance finished. Going to schedule next 
rebalance with addition"
-                                    + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            zonePartitionId, stableFromRaft, 
calculatedPendingAddition
-                    );
-                    break;
-                case SWITCH_REDUCE_SUCCESS:
-                    LOG.info("Rebalance finished. Going to schedule next 
rebalance with reduction"
-                                    + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
-                            zonePartitionId, stableFromRaft, 
calculatedPendingReduction
-                    );
-                    break;
-                case SCHEDULE_PENDING_REBALANCE_SUCCESS:
-                    LOG.info(
-                            "Rebalance finished. Going to schedule next 
rebalance [zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
-                            zonePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
-                    );
-                    break;
-                case FINISH_REBALANCE_SUCCESS:
-                    LOG.info("Rebalance finished [zonePartitionId={}, 
appliedPeers={}]", zonePartitionId, stableFromRaft);
-                    break;
-
-                default:
-                    assert false : res;
-                    break;
-            }
-
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            if (!hasCause(e, NodeStoppingException.class)) {
-                if (hasCause(e, TimeoutException.class)) {
-                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-25276 - handle this timeout 
properly.
-                    LOG.error("Unable to commit partition configuration to 
metastore: {}", e, zonePartitionId);
-                } else {
-                    String errorMessage = String.format("Unable to commit 
partition configuration to metastore: %s", zonePartitionId);
-                    failureProcessor.process(new FailureContext(e, 
errorMessage));
-                }
-            }
+    private static void logSwitchSuccess(
+            int res,
+            Set<Assignment> stableFromRaft,
+            ZonePartitionId zonePartitionId,
+            Set<Assignment> calculatedPendingAddition,
+            Set<Assignment> calculatedPendingReduction,
+            Entry plannedEntry
+    ) {
+        switch (res) {
+            case SWITCH_APPEND_SUCCESS:
+                LOG.info("Rebalance finished. Going to schedule next rebalance 
with addition"
+                                + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                        zonePartitionId, stableFromRaft, 
calculatedPendingAddition
+                );
+                break;
+            case SWITCH_REDUCE_SUCCESS:
+                LOG.info("Rebalance finished. Going to schedule next rebalance 
with reduction"
+                                + " [zonePartitionId={}, appliedPeers={}, 
plannedPeers={}]",
+                        zonePartitionId, stableFromRaft, 
calculatedPendingReduction
+                );
+                break;
+            case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+                LOG.info(
+                        "Rebalance finished. Going to schedule next rebalance 
[zonePartitionId={}, appliedPeers={}, plannedPeers={}]",
+                        zonePartitionId, stableFromRaft, 
Assignments.fromBytes(plannedEntry.value()).nodes()
+                );
+                break;
+            case FINISH_REBALANCE_SUCCESS:
+                LOG.info("Rebalance finished [zonePartitionId={}, 
appliedPeers={}]", zonePartitionId, stableFromRaft);
+                break;
+
+            default:
+                assert false : res;
+                break;
         }
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8ddaae06460..3709f098da1 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -255,7 +255,6 @@ public class PartitionReplicaLifecycleManager extends
     /**
      * This future completes on {@link #beforeNodeStop()} with {@link 
NodeStoppingException} before the {@link #busyLock} is blocked.
      */
-    // TODO: https://issues.apache.org/jira/browse/IGNITE-17592
     private final CompletableFuture<Void> stopReplicaLifecycleFuture = new 
CompletableFuture<>();
 
     /**

Reply via email to