This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a2b909f6345 IGNITE-25543 Implement retry mechanism for reset (#6053)
a2b909f6345 is described below

commit a2b909f63451dab3d3db321552ae6b6fcb75d7de
Author: Cyrill <[email protected]>
AuthorDate: Thu Nov 20 19:24:49 2025 +0300

    IGNITE-25543 Implement retry mechanism for reset (#6053)
    
    Co-authored-by: Kirill Sizov <[email protected]>
---
 .../ignite/internal/util/IgniteBusyLock.java       |  35 ++++++
 .../ignite/internal/util/IgniteSpinBusyLock.java   |   4 +-
 .../internal/util/IgniteStripedBusyLock.java       |   4 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  20 ++--
 .../rebalance/PartitionMover.java                  |   2 +-
 .../distributionzones/rebalance/RebalanceUtil.java |  11 --
 .../metastorage/impl/MetaStorageManagerImpl.java   |   2 +-
 .../impl/StandaloneMetaStorageManager.java         |  16 +++
 .../PartitionReplicaLifecycleManager.java          |  49 ++++----
 .../PartitionReplicaLifecycleManagerTest.java      |   3 +-
 .../internal/raft/rebalance/ExceptionUtils.java    |  43 +++++++
 .../raft/rebalance/RaftStaleUpdateException.java   |  30 +++++
 .../java/org/apache/ignite/internal/raft/Loza.java |   5 +-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |   6 +
 .../internal/raft/server/impl/JraftServerImpl.java |   4 +-
 .../ItPlacementDriverReplicaSideTest.java          |   2 +-
 .../ignite/internal/replicator/ReplicaManager.java |  96 +++++++++++++--
 .../internal/replicator/ReplicaStateManager.java   |   2 +-
 .../internal/replicator/ReplicaManagerTest.java    |   2 +-
 .../ignite/internal/app/ThreadPoolsManager.java    |   6 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |   2 +-
 .../internal/table/distributed/TableManager.java   |  13 +-
 .../distributed/TableManagerRecoveryTest.java      | 132 +++++++++++++++++----
 .../apache/ignite/distributed/ItTxTestCluster.java |   2 +-
 modules/transactions/build.gradle                  |   1 +
 .../disaster/DisasterRecoveryTestUtil.java         |   7 +-
 .../ItDisasterRecoveryReconfigurationTest.java     | 117 +++++++++++-------
 27 files changed, 478 insertions(+), 138 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
new file mode 100644
index 00000000000..b093bc25af3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Synchronization aid to track "busy" state of a subsystem that owns it.
+ */
+public interface IgniteBusyLock {
+    /**
+     * Enters "busy" state.
+     *
+     * @return {@code true} if entered to busy state.
+     */
+    boolean enterBusy();
+
+    /**
+     * Leaves "busy" state.
+     */
+    void leaveBusy();
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
index 80dfb3bee19..692f04228d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
  *
  * @see IgniteSpinReadWriteLock
  */
-public class IgniteSpinBusyLock {
+public class IgniteSpinBusyLock implements IgniteBusyLock {
     /** Underlying read-write lock. */
     private final IgniteSpinReadWriteLock lock = new IgniteSpinReadWriteLock();
 
@@ -41,6 +41,7 @@ public class IgniteSpinBusyLock {
      *
      * @return {@code true} if entered to busy state.
      */
+    @Override
     public boolean enterBusy() {
         return !lock.writeLockedByCurrentThread() && lock.tryReadLock();
     }
@@ -57,6 +58,7 @@ public class IgniteSpinBusyLock {
     /**
      * Leaves "busy" state.
      */
+    @Override
     public void leaveBusy() {
         lock.readUnlock();
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
index 53dec8e2d4b..96e4141ee23 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.util;
  * Busy lock implementation based on {@link IgniteStripedReadWriteLock}. API 
is analogous to {@link IgniteSpinBusyLock}, but without the
  * ability to unblock it.
  */
-public class IgniteStripedBusyLock {
+public class IgniteStripedBusyLock implements IgniteBusyLock {
     /** Underlying read-write lock. */
     private final IgniteStripedReadWriteLock lock = new 
IgniteStripedReadWriteLock();
 
@@ -32,6 +32,7 @@ public class IgniteStripedBusyLock {
      *
      * @return {@code true} if entered to busy state.
      */
+    @Override
     public boolean enterBusy() {
         if (!lock.readLock().tryLock()) {
             return false;
@@ -49,6 +50,7 @@ public class IgniteStripedBusyLock {
     /**
      * Leaves "busy" state.
      */
+    @Override
     public void leaveBusy() {
         lock.readLock().unlock();
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 896af3ba7b5..b32ec82a249 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -873,9 +873,9 @@ public class IgniteUtils {
      * @param fn Function to run.
      * @param <T> Type of returned value from {@code fn}.
      * @return Result of the provided function.
-     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
      */
-    public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> 
fn) {
+    public static <T> T inBusyLock(IgniteBusyLock busyLock, Supplier<T> fn) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -892,9 +892,9 @@ public class IgniteUtils {
      * @param busyLock Component's busy lock.
      * @param fn Function to run.
      * @return Result of the provided function.
-     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
      */
-    public static int inBusyLock(IgniteSpinBusyLock busyLock, IntSupplier fn) {
+    public static int inBusyLock(IgniteBusyLock busyLock, IntSupplier fn) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -910,9 +910,9 @@ public class IgniteUtils {
      *
      * @param busyLock Component's busy lock.
      * @param fn Runnable to run.
-     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+     * @throws IgniteInternalException with cause {@link 
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
      */
-    public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
+    public static void inBusyLock(IgniteBusyLock busyLock, Runnable fn) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -930,9 +930,9 @@ public class IgniteUtils {
      * @param busyLock Component's busy lock.
      * @param fn Function to run.
      * @return Future returned from the {@code fn}, or future with the {@link 
NodeStoppingException} if
-     *         {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime 
exception/error while executing the {@code fn}.
+     *         {@link IgniteBusyLock#enterBusy()} failed or with runtime 
exception/error while executing the {@code fn}.
      */
-    public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock 
busyLock, Supplier<CompletableFuture<T>> fn) {
+    public static <T> CompletableFuture<T> inBusyLockAsync(IgniteBusyLock 
busyLock, Supplier<CompletableFuture<T>> fn) {
         if (!busyLock.enterBusy()) {
             return failedFuture(new NodeStoppingException());
         }
@@ -947,13 +947,13 @@ public class IgniteUtils {
     }
 
     /**
-     * Method that runs the provided {@code fn} in {@code busyLock} if {@link 
IgniteSpinBusyLock#enterBusy()} succeed. Otherwise it just
+     * Method that runs the provided {@code fn} in {@code busyLock} if {@link 
IgniteBusyLock#enterBusy()} succeed. Otherwise it just
      * silently returns.
      *
      * @param busyLock Component's busy lock.
      * @param fn Runnable to run.
      */
-    public static void inBusyLockSafe(IgniteSpinBusyLock busyLock, Runnable 
fn) {
+    public static void inBusyLockSafe(IgniteBusyLock busyLock, Runnable fn) {
         if (!busyLock.enterBusy()) {
             return;
         }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
index be5fc2b2cd7..05d6bb66a4f 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.distributionzones.rebalance;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.recoverable;
+import static 
org.apache.ignite.internal.raft.rebalance.ExceptionUtils.recoverable;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 74b03b41394..1d938e18408 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -581,17 +581,6 @@ public class RebalanceUtil {
         return Integer.parseInt(toStringWithoutPrefix(key, prefix.length));
     }
 
-    /**
-     * Checks if an error is recoverable, so we can retry a rebalance intent.
-     *
-     * @param t The throwable.
-     * @return {@code True} if this is a recoverable exception.
-     */
-    public static boolean recoverable(Throwable t) {
-        // As long as we don't have a general failure handler, we assume that 
all errors are recoverable.
-        return true;
-    }
-
     /**
      * Removes nodes from set of nodes.
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 97ba74e3eae..5b9bb88e17e 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -1136,7 +1136,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     /**
      * Saves processed Meta Storage revision to the {@link #appliedRevision}.
      */
-    private void onRevisionApplied(long revision) {
+    protected void onRevisionApplied(long revision) {
         appliedRevision = revision;
     }
 
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 381b4eb04a4..0eb1cc990f7 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -95,6 +95,9 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
     @Nullable
     private Consumer<Boolean> afterInvokeInterceptor;
 
+    @Nullable
+    private Consumer<Long> onRevisionAppliedInterceptor;
+
     /** Creates standalone MetaStorage manager. */
     public static StandaloneMetaStorageManager create() {
         return create(TEST_NODE_NAME);
@@ -239,6 +242,19 @@ public class StandaloneMetaStorageManager extends 
MetaStorageManagerImpl {
         this.afterInvokeInterceptor = afterInvokeInterceptor;
     }
 
+    public void setOnRevisionAppliedInterceptor(@Nullable Consumer<Long> 
onRevisionAppliedInterceptor) {
+        this.onRevisionAppliedInterceptor = onRevisionAppliedInterceptor;
+    }
+
+    @Override
+    protected void onRevisionApplied(long revision) {
+        super.onRevisionApplied(revision);
+
+        if (onRevisionAppliedInterceptor != null) {
+            onRevisionAppliedInterceptor.accept(revision);
+        }
+    }
+
     @Override
     public CompletableFuture<Boolean> invoke(Condition cond, Operation 
success, Operation failure) {
         return super.invoke(cond, success, failure)
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index b53c7d1a315..c98a7a1ae03 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -22,7 +22,6 @@ import static java.util.Collections.emptySet;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
@@ -1306,6 +1305,13 @@ public class PartitionReplicaLifecycleManager extends
                         zonePartitionId,
                         pendingAssignments.nodes(),
                         revision);
+            }).whenComplete((v, ex) -> {
+                if (ex != null) {
+                    LOG.debug(
+                            "Failed to handle change pending assignment event "
+                                    + "[zonePartitionId={}, 
stableAssignments={}, pendingAssignments={}, revision={}, isRecovery={}].",
+                            zonePartitionId, stableAssignments, 
pendingAssignments, revision, isRecovery, ex);
+                }
             });
         } finally {
             busyLock.leaveBusy();
@@ -1350,22 +1356,7 @@ public class PartitionReplicaLifecycleManager extends
         // For regular pending assignments we use (old) stable set, so that 
none of new nodes would be able to propose itself as a leader.
         // For forced assignments, we should do the same thing, but only for 
the subset of stable set that is alive right now. Dead nodes
         // are excluded. It is calculated precisely as an intersection between 
forced assignments and (old) stable assignments.
-        Assignments computedStableAssignments;
-
-        // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the 
second condition
-        //  when we will have a proper handling of empty stable assignments
-        if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
-            // This condition can only pass if all stable nodes are dead, and 
we start new raft group from scratch.
-            // In this case new initial configuration must match new forced 
assignments.
-            computedStableAssignments = 
Assignments.forced(pendingAssignmentsNodes, pendingAssignments.timestamp());
-        } else if (pendingAssignmentsAreForced) {
-            // In case of forced assignments we need to remove nodes that are 
present in the stable set but are missing from the
-            // pending set. Such operation removes dead stable nodes from the 
resulting stable set, which guarantees that we will
-            // have a live majority.
-            computedStableAssignments = pendingAssignments;
-        } else {
-            computedStableAssignments = stableAssignments;
-        }
+        Assignments computedStableAssignments = 
getComputedStableAssignments(stableAssignments, pendingAssignments);
 
         CompletableFuture<?> localServicesStartFuture;
 
@@ -1388,11 +1379,8 @@ public class PartitionReplicaLifecycleManager extends
                     false
             );
         } else if (pendingAssignmentsAreForced && localAssignmentInPending != 
null) {
-            localServicesStartFuture = runAsync(() -> {
-                inBusyLock(busyLock,
-                        () -> replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()), revision)
-                );
-            }, ioExecutor);
+            localServicesStartFuture =
+                    replicaMgr.resetWithRetry(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()), revision);
         } else {
             localServicesStartFuture = nullCompletedFuture();
         }
@@ -1442,6 +1430,23 @@ public class PartitionReplicaLifecycleManager extends
                 }), ioExecutor);
     }
 
+    private static Assignments getComputedStableAssignments(@Nullable 
Assignments stableAssignments, Assignments pendingAssignments) {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the 
second condition
+        //  when we will have a proper handling of empty stable assignments
+        if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
+            // This condition can only pass if all stable nodes are dead, and 
we start new raft group from scratch.
+            // In this case new initial configuration must match new forced 
assignments.
+            return Assignments.forced(pendingAssignments.nodes(), 
pendingAssignments.timestamp());
+        } else if (pendingAssignments.force()) {
+            // In case of forced assignments we need to remove nodes that are 
present in the stable set but are missing from the
+            // pending set. Such operation removes dead stable nodes from the 
resulting stable set, which guarantees that we will
+            // have a live majority.
+            return pendingAssignments;
+        } else {
+            return stableAssignments;
+        }
+    }
+
     private CatalogZoneDescriptor zoneDescriptorAt(int zoneId, long timestamp) 
{
         Catalog catalog = catalogService.activeCatalog(timestamp);
         assert catalog != null : "Catalog is not available at " + 
nullableHybridTimestamp(timestamp);
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 9a6493d7e1c..d1cccbc6e64 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -56,7 +56,6 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -178,7 +177,7 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
             @Mock DataStorageManager dataStorageManager,
             @Mock CatalogService catalogService,
             @Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
-            @InjectExecutorService ExecutorService executorService,
+            @InjectExecutorService ScheduledExecutorService executorService,
             @InjectExecutorService ScheduledExecutorService 
scheduledExecutorService,
             @InjectConfiguration SystemDistributedConfiguration 
systemDistributedConfiguration
 
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
new file mode 100644
index 00000000000..5b9e836cec6
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.rebalance;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import org.apache.ignite.internal.lang.ComponentStoppingException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+
+/**
+ * Helper class for exception handling.
+ */
+public class ExceptionUtils {
+
+    /**
+     * Checks if an error is recoverable, so we can retry a rebalance intent.
+     *
+     * @param t The throwable.
+     * @return {@code True} if this is a recoverable exception.
+     */
+    public static boolean recoverable(Throwable t) {
+        if (hasCause(t, NodeStoppingException.class, 
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
new file mode 100644
index 00000000000..d75150403a1
--- /dev/null
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.rebalance;
+
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+
+/**
+ * Special type of exception used when proposed data is stale and will not be 
applied.
+ */
+public class RaftStaleUpdateException extends IgniteInternalCheckedException {
+
+    public RaftStaleUpdateException(String message) {
+        super(message);
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 046115cccd4..18515206f9f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -643,11 +644,11 @@ public class Loza implements RaftManager {
      * @param peersAndLearners New node configuration.
      * @param sequenceToken Sequence token.
      */
-    public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners 
peersAndLearners, long sequenceToken) {
+    public Status resetPeers(RaftNodeId raftNodeId, PeersAndLearners 
peersAndLearners, long sequenceToken) {
         LOG.warn("Reset peers for raft group {}, new configuration is {}, 
sequence token {}",
                 raftNodeId, peersAndLearners, sequenceToken);
 
-        raftServer.resetPeers(raftNodeId, peersAndLearners, sequenceToken);
+        return raftServer.resetPeers(raftNodeId, peersAndLearners, 
sequenceToken);
     }
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 424d02cbad0..d264f700527 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
 import org.apache.ignite.internal.raft.service.LeaderWithTerm;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -874,6 +875,11 @@ public class RaftGroupServiceImpl implements 
RaftGroupService {
                 break;
             }
 
+            case ESTALE:
+                fut.completeExceptionally(new 
RaftStaleUpdateException(resp.errorMsg()));
+
+                break;
+
             default:
                 fut.completeExceptionally(new RaftException(error, 
resp.errorMsg()));
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 84099e9694a..f4e375a883b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -705,14 +705,14 @@ public class JraftServerImpl implements RaftServer {
      * @param raftNodeId Raft node ID.
      * @param peersAndLearners New node configuration.
      */
-    public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners 
peersAndLearners, long sequenceToken) {
+    public Status resetPeers(RaftNodeId raftNodeId, PeersAndLearners 
peersAndLearners, long sequenceToken) {
         RaftGroupService raftGroupService = nodes.get(raftNodeId);
 
         List<PeerId> peerIds = 
peersAndLearners.peers().stream().map(PeerId::fromPeer).collect(toList());
 
         List<PeerId> learnerIds = 
peersAndLearners.learners().stream().map(PeerId::fromPeer).collect(toList());
 
-        raftGroupService.getRaftNode().resetPeers(new Configuration(peerIds, 
learnerIds, sequenceToken));
+        return raftGroupService.getRaftNode().resetPeers(new 
Configuration(peerIds, learnerIds, sequenceToken));
     }
 
     /**
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index b3916cacbae..98f87cfa53a 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -237,7 +237,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     raftManager,
                     partitionsConfigurer,
                     new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout")),
-                    ForkJoinPool.commonPool(),
+                    Executors.newSingleThreadScheduledExecutor(),
                     replicaGrpId -> nullCompletedFuture(),
                     ForkJoinPool.commonPool()
             );
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index d6089f26a2c..1d92e8ff09f 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.replicator;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.delayedExecutor;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toSet;
 import static java.util.stream.Collectors.toUnmodifiableSet;
 import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.raft.rebalance.ExceptionUtils.recoverable;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
@@ -38,6 +40,7 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSucc
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -98,6 +101,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -127,8 +131,11 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.error.RaftError;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Replica manager maintains {@link Replica} instances on an Ignite node.
@@ -183,7 +190,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Creator for {@link 
org.apache.ignite.internal.raft.storage.LogStorageFactory} for volatile tables. 
*/
     private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
 
-    private final Executor replicaStartStopExecutor;
+    private final ScheduledExecutorService replicaLifecycleExecutor;
 
     /** Raft command marshaller for raft server endpoints starting. */
     private final Marshaller raftCommandsMarshaller;
@@ -243,7 +250,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param partitionRaftConfigurer Configurer of raft options on raft group 
creation.
      * @param volatileLogStorageFactoryCreator Creator for {@link 
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
      *      volatile tables.
-     * @param replicaStartStopExecutor Executor for asynchronous replicas 
lifecycle management.
+     * @param replicaLifecycleExecutor Executor for asynchronous replicas 
lifecycle management.
      * @param getPendingAssignmentsSupplier The supplier of pending 
assignments for rebalance failover purposes.
      * @param throttledLogExecutor Executor to clean up the throttled logger 
cache.
      */
@@ -262,7 +269,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftManager raftManager,
             RaftGroupOptionsConfigurer partitionRaftConfigurer,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
-            Executor replicaStartStopExecutor,
+            ScheduledExecutorService replicaLifecycleExecutor,
             Function<ReplicationGroupId, 
CompletableFuture<VersionedAssignments>> getPendingAssignmentsSupplier,
             Executor throttledLogExecutor
     ) {
@@ -282,10 +289,10 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         this.raftManager = raftManager;
         this.partitionRaftConfigurer = partitionRaftConfigurer;
         this.getPendingAssignmentsSupplier = getPendingAssignmentsSupplier;
-        this.replicaStartStopExecutor = replicaStartStopExecutor;
+        this.replicaLifecycleExecutor = replicaLifecycleExecutor;
 
         this.replicaStateManager = new ReplicaStateManager(
-                replicaStartStopExecutor,
+                replicaLifecycleExecutor,
                 placementDriver,
                 this,
                 failureProcessor
@@ -788,9 +795,84 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param peersAndLearners New node configuration.
      * @param sequenceToken Sequence token.
      */
+    @VisibleForTesting
     public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners 
peersAndLearners, long sequenceToken) {
         RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new 
Peer(localNodeConsistentId));
-        ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners, 
sequenceToken);
+        Loza loza = (Loza) raftManager;
+        Status status = loza.resetPeers(raftNodeId, peersAndLearners, 
sequenceToken);
+
+        // Stale configuration change will not be retried.
+        if (!status.isOk() && status.getRaftError() == RaftError.ESTALE) {
+            throw new IgniteException(INTERNAL_ERR, new 
RaftStaleUpdateException(status.getErrorMsg()));
+        }
+    }
+
+    /**
+     * Performs a {@code resetPeers} operation on raft node with retries.
+     *
+     * <p>Performs retries as long as the received exception is recoverable, 
which is any exception other than
+     * node or component stopping exceptions or raft stale exceptions caused 
by stale sequence token.
+     *
+     * <p>This method is safe to retry even without chaining on it as it will 
either perform the reset as expected when no other
+     * reconfigurations happened on the cluster or will complete with an 
exception because someone has already applied a configuration that
+     * is newer than the one we want to reset to.
+     *
+     * @param replicaGrpId Replication group ID.
+     * @param peersAndLearners New peers and learners.
+     * @param sequenceToken Sequence token.
+     * @see 
org.apache.ignite.internal.raft.rebalance.ExceptionUtils#recoverable(Throwable)
+     *
+     */
+    public CompletableFuture<Void> resetWithRetry(ReplicationGroupId 
replicaGrpId, PeersAndLearners peersAndLearners, long sequenceToken) {
+        var result = new CompletableFuture<Void>();
+
+        resetWithRetry(replicaGrpId, peersAndLearners, result, sequenceToken, 
1);
+
+        return result;
+    }
+
+    private void resetWithRetry(
+            ReplicationGroupId replicaGrpId,
+            PeersAndLearners peersAndLearners,
+            CompletableFuture<Void> result,
+            long sequenceToken,
+            int iteration
+    ) {
+        if (iteration % 1000 == 0) {
+            LOG.info("Retrying reset [iter={}, groupId={}, 
peersAndLearners={}]", iteration, replicaGrpId, peersAndLearners);
+        }
+        runAsync(() -> inBusyLock(busyLock, () -> {
+            assert isReplicaStarted(replicaGrpId) : "The local node is outside 
of the replication group: " + replicaGrpId;
+
+            resetPeers(replicaGrpId, peersAndLearners, sequenceToken);
+        }), replicaLifecycleExecutor)
+                .whenComplete((resetSuccessful, ex) -> {
+                    if (ex != null) {
+                        if (recoverable(ex)) {
+                            LOG.debug("Failed to reset peers. Retrying 
[groupId={}]. ", replicaGrpId, ex);
+
+                            resetWithRetryThrottling(replicaGrpId, 
peersAndLearners, result, sequenceToken, iteration);
+                        } else {
+                            result.completeExceptionally(ex);
+                        }
+                    } else {
+                        result.complete(null);
+                    }
+                });
+    }
+
+    private void resetWithRetryThrottling(
+            ReplicationGroupId replicaGrpId,
+            PeersAndLearners peersAndLearners,
+            CompletableFuture<Void> result,
+            long sequenceToken,
+            int iteration
+    ) {
+        replicaLifecycleExecutor.schedule(
+                () -> resetWithRetry(replicaGrpId, peersAndLearners, result, 
sequenceToken, iteration + 1),
+                500,
+                TimeUnit.MILLISECONDS
+        );
     }
 
     private RaftGroupOptions groupOptionsForPartition(boolean 
isVolatileStorage, @Nullable SnapshotStorageFactory snapshotFactory) {
@@ -906,7 +988,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                     }
 
                     return true;
-                }, replicaStartStopExecutor);
+                }, replicaLifecycleExecutor);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index aa1c3ec35a3..066e33319a5 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -186,7 +186,7 @@ class ReplicaStateManager {
                     assert forcedAssignments.force() :
                             format("Unexpected assignments to force 
[assignments={}, groupId={}].", forcedAssignments, groupId);
 
-                    replicaManager.resetPeers(groupId, 
fromAssignments(forcedAssignments.nodes()), revision);
+                    replicaManager.resetWithRetry(groupId, 
fromAssignments(forcedAssignments.nodes()), revision);
                 }
 
                 // Telling the caller that the replica is started.
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 9a83d085ca8..69ae5c362b6 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -136,7 +136,7 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 raftManager,
                 partitionsConfigurer,
                 volatileLogStorageFactoryCreator,
-                ForkJoinPool.commonPool(),
+                Executors.newSingleThreadScheduledExecutor(),
                 replicaGrpId -> nullCompletedFuture(),
                 ForkJoinPool.commonPool()
         );
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index e64e9bfedd8..3f8f9bb5c4e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -51,7 +51,7 @@ public class ThreadPoolsManager implements IgniteComponent {
      * Separate executor for IO operations like partition storage 
initialization, partition raft group meta data persisting,
      * index storage creation...
      */
-    private final ExecutorService tableIoExecutor;
+    private final ScheduledExecutorService tableIoExecutor;
 
     /**
      * Executor on which partition operations are executed. Might do storage 
reads and writes (so it's expected to execute disk I/O).
@@ -73,7 +73,7 @@ public class ThreadPoolsManager implements IgniteComponent {
     public ThreadPoolsManager(String nodeName, MetricManager metricManager) {
         int cpus = Runtime.getRuntime().availableProcessors();
 
-        tableIoExecutor = Executors.newFixedThreadPool(
+        tableIoExecutor = Executors.newScheduledThreadPool(
                 Math.min(cpus * 3, 25),
                 IgniteThreadFactory.create(nodeName, "tableManager-io", LOG, 
STORAGE_READ, STORAGE_WRITE)
         );
@@ -130,7 +130,7 @@ public class ThreadPoolsManager implements IgniteComponent {
     /**
      * Returns executor used to create/destroy storages, start partition Raft 
groups, create index storages...
      */
-    public ExecutorService tableIoExecutor() {
+    public ScheduledExecutorService tableIoExecutor() {
         return tableIoExecutor;
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 45e416252a9..d862c0cf09b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -219,7 +219,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 raftManager,
                 RaftGroupOptionsConfigurer.EMPTY,
                 view -> new LocalLogStorageFactory(),
-                ForkJoinPool.commonPool(),
+                Executors.newSingleThreadScheduledExecutor(),
                 replicaGrpId -> nullCompletedFuture(),
                 ForkJoinPool.commonPool()
         );
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 255117828c6..03e9855cf4d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2333,7 +2333,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         };
     }
 
-    private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+    protected CompletableFuture<Void> handleChangePendingAssignmentEvent(
             Entry pendingAssignmentsEntry,
             long revision,
             boolean isRecovery
@@ -2482,12 +2482,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                             computedStableAssignments
                     ), ioExecutor);
         } else if (pendingAssignmentsAreForced && localAssignmentInPending != 
null) {
-            localServicesStartFuture = runAsync(() -> inBusyLock(busyLock, () 
-> {
-                assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local 
node is outside of the replication group: " + replicaGrpId;
-
-                // Sequence token for data partitions is MS revision.
-                replicaMgr.resetPeers(replicaGrpId, 
fromAssignments(computedStableAssignments.nodes()), revision);
-            }), ioExecutor);
+            localServicesStartFuture = replicaMgr.resetWithRetry(
+                    replicaGrpId,
+                    fromAssignments(computedStableAssignments.nodes()),
+                    revision
+            );
         } else {
             localServicesStartFuture = nullCompletedFuture();
         }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f92f200d26b..534d1b8fad4 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -22,8 +22,12 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
+import static 
org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator.pendingAssignmentsCalculator;
+import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
 import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
 import static 
org.apache.ignite.internal.table.TableTestUtils.createSimpleTable;
@@ -48,6 +52,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -68,6 +73,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Supplier;
@@ -75,6 +81,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.components.LogSyncer;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
@@ -85,6 +92,7 @@ import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
 import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -96,7 +104,6 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
 import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.impl.MetaStorageRevisionListenerRegistry;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -111,17 +118,25 @@ import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
 import 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
 import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
 import org.apache.ignite.internal.replicator.Replica;
 import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -150,6 +165,7 @@ import 
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.IgniteSql;
 import org.jetbrains.annotations.Nullable;
@@ -170,16 +186,16 @@ import org.mockito.quality.Strictness;
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class TableManagerRecoveryTest extends IgniteAbstractTest {
     private static final String NODE_NAME = "testNode1";
+    private static final String NODE_NAME2 = "testNode2";
     private static final String ZONE_NAME = "zone1";
     private static final String TABLE_NAME = "testTable";
     private static final String INDEX_NAME = "testIndex1";
     private static final String INDEXED_COLUMN_NAME = "columnName";
     private static final int PARTITIONS = 8;
-    private static final InternalClusterNode node = new ClusterNodeImpl(
-            UUID.randomUUID(),
-            NODE_NAME,
-            new NetworkAddress("127.0.0.1", 2245)
-    );
+    private static final InternalClusterNode node =
+            new ClusterNodeImpl(UUID.randomUUID(), NODE_NAME, new 
NetworkAddress("127.0.0.1", 2245));
+    private static final InternalClusterNode node2 =
+            new ClusterNodeImpl(UUID.randomUUID(), NODE_NAME2, new 
NetworkAddress("127.0.0.1", 2246));
     private static final long WAIT_TIMEOUT = SECONDS.toMillis(10);
 
     // Configuration
@@ -197,7 +213,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
     // Table manager dependencies.
     private SchemaManager sm;
     private CatalogManager catalogManager;
-    private MetaStorageManagerImpl metaStorageManager;
+    private StandaloneMetaStorageManager metaStorageManager;
     private TxStateRocksDbSharedStorage sharedTxStateStorage;
     private TableManager tableManager;
     @InjectExecutorService(threadCount = 4, allowedOperations = {STORAGE_READ, 
STORAGE_WRITE})
@@ -212,7 +228,6 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
     private IndexMetaStorage indexMetaStorage;
 
     // Table internal components
-    @Mock
     private ReplicaManager replicaMgr;
     @Mock
     private LogSyncer logSyncer;
@@ -333,6 +348,60 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 .loadTableListenerToZoneReplica(any(), anyInt(), any(), any(), 
any(), eq(true));
     }
 
+    @Test
+    public void testResetPeersRetry() {
+        createSimpleTable(catalogManager, TABLE_NAME);
+
+        int tableId = 
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME, 
TABLE_NAME).id();
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 0);
+
+        int zoneId = 
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME, 
TABLE_NAME).zoneId();
+        ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+        long assignmentsTimestamp = 
catalogManager.catalog(catalogManager.latestCatalogVersion()).time();
+
+        AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
+                .stable(Assignments.of(Set.of(Assignment.forPeer(node.name()), 
Assignment.forPeer(node2.name())), assignmentsTimestamp))
+                
.target(Assignments.forced(Set.of(Assignment.forPeer(node.name())), 
assignmentsTimestamp))
+                .toQueue();
+
+        doReturn(true).when(replicaMgr).isReplicaStarted(any());
+        doReturn(completedFuture(mock(Replica.class, 
RETURNS_DEEP_STUBS))).when(replicaMgr).replica(any());
+
+        doAnswer(invocation -> {
+            throw new IllegalStateException("Test exception");
+        })
+                .doAnswer(invocation -> {
+                    throw new IgniteException(0);
+                })
+                .doAnswer(invocation -> null)
+                .when(replicaMgr).resetPeers(any(), any(), anyLong());
+
+        // This is to wait until handleChangePendingAssignments is finished.
+        CompletableFuture<Void> assignmentsHandled = new CompletableFuture<>();
+        metaStorageManager.setOnRevisionAppliedInterceptor(rev -> {
+            assignmentsHandled.complete(null);
+        });
+
+        if (colocationEnabled()) {
+            CompletableFuture<Void> putReset = metaStorageManager.put(
+                    
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId),
+                    assignmentsQueue.toBytes()
+            );
+            assertThat(putReset, willCompleteSuccessfully());
+        } else {
+            CompletableFuture<Void> putReset = metaStorageManager.put(
+                    pendingPartAssignmentsQueueKey(tablePartitionId),
+                    assignmentsQueue.toBytes()
+            );
+            assertThat(putReset, willCompleteSuccessfully());
+        }
+
+        assertThat(assignmentsHandled, willCompleteSuccessfully());
+
+        verify(replicaMgr, times(3)).resetPeers(any(), any(), anyLong());
+    }
+
     /**
      * Creates and starts TableManage and dependencies.
      */
@@ -364,20 +433,44 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
         when(topologyService.localMember()).thenReturn(node);
         when(distributionZoneManager.dataNodes(any(), anyInt(), 
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
 
+        PlacementDriver placementDriver = new TestPlacementDriver(node);
+        ClockService clockService = new TestClockService(clock);
+        FailureProcessor failureProcessor = mock(FailureProcessor.class);
+
+        replicaMgr = spy(new ReplicaManager(
+                NODE_NAME,
+                clusterService,
+                mock(ClusterManagementGroupManager.class, RETURNS_DEEP_STUBS),
+                clockService,
+                Set.of(),
+                placementDriver,
+                partitionOperationsExecutor,
+                () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+                failureProcessor,
+                null,
+                mock(TopologyAwareRaftGroupServiceFactory.class),
+                rm,
+                RaftGroupOptionsConfigurer.EMPTY,
+                new VolatileLogStorageFactoryCreator(NODE_NAME, 
workDir.resolve("volatile-log-spillout")),
+                Executors.newSingleThreadScheduledExecutor(),
+                replicaGrpId -> nullCompletedFuture(),
+                ForkJoinPool.commonPool()
+        ));
+
         doReturn(nullCompletedFuture())
                 
.when(replicaMgr).startReplica(any(RaftGroupEventsListener.class), any(), 
anyBoolean(), any(), any(), any(), any(), any());
 
         ZonePartitionReplicaListener zonePartitionReplicaListener = 
mock(ZonePartitionReplicaListener.class);
         Replica replica = mock(Replica.class);
-        when(replicaMgr.startReplica(any(ReplicationGroupId.class), any(), 
any(), any(), any(), any(), anyBoolean(), any(), any()))
-                .then(invocation -> {
-                    partitionReplicaLifecycleManager
-                            .zonePartitionResources(invocation.getArgument(0))
-                            .replicaListenerFuture()
-                            .complete(zonePartitionReplicaListener);
 
-                    return completedFuture(replica);
-                });
+        doAnswer(invocation -> {
+            partitionReplicaLifecycleManager
+                    .zonePartitionResources(invocation.getArgument(0))
+                    .replicaListenerFuture()
+                    .complete(zonePartitionReplicaListener);
+
+            return completedFuture(replica);
+        }).when(replicaMgr).startReplica(any(ReplicationGroupId.class), any(), 
any(), any(), any(), any(), anyBoolean(), any(), any());
 
         doReturn(trueCompletedFuture()).when(replicaMgr).stopReplica(any());
         doAnswer(invocation -> {
@@ -408,11 +501,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         var revisionUpdater = new 
MetaStorageRevisionListenerRegistry(metaStorageManager);
 
-        PlacementDriver placementDriver = new TestPlacementDriver(node);
-
         lowWatermark = new TestLowWatermark();
         lowWatermark.updateWithoutNotify(savedWatermark);
-        ClockService clockService = new TestClockService(clock);
 
         indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark, 
metaStorageManager);
 
@@ -424,8 +514,6 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         sm = new SchemaManager(revisionUpdater, catalogManager);
 
-        FailureProcessor failureProcessor = mock(FailureProcessor.class);
-
         sharedTxStateStorage = new TxStateRocksDbSharedStorage(
                 node.name(),
                 workDir.resolve("tx-state"),
@@ -538,6 +626,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                             sm,
                             indexMetaStorage,
                             sharedTxStateStorage,
+                            replicaMgr,
                             partitionReplicaLifecycleManager,
                             tableManager
                     );
@@ -564,6 +653,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                                 new ComponentContext(),
                                 tableManager,
                                 partitionReplicaLifecycleManager,
+                                replicaMgr,
                                 sharedTxStateStorage,
                                 dsm,
                                 sm,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 991b2b5e5d7..2a6447fffaa 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -505,7 +505,7 @@ public class ItTxTestCluster {
                     raftSrv,
                     partitionRaftConfigurer,
                     new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout")),
-                    ForkJoinPool.commonPool(),
+                    Executors.newSingleThreadScheduledExecutor(),
                     replicaGrpId -> nullCompletedFuture(),
                     ForkJoinPool.commonPool()
             );
diff --git a/modules/transactions/build.gradle 
b/modules/transactions/build.gradle
index ea2bd4285a7..fae253133f6 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -95,6 +95,7 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-runner'))
     integrationTestImplementation testFixtures(project(':ignite-catalog'))
     integrationTestImplementation libs.netty.transport
+    integrationTestImplementation libs.awaitility
 
     testFixturesImplementation project(':ignite-configuration')
     testFixturesImplementation project(':ignite-core')
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
index dad4fa57c70..a492367a781 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -95,7 +96,11 @@ class DisasterRecoveryTestUtil {
                     ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(partId);
 
                     for (Operation operation : operations) {
-                        ByteArray opKey = new 
ByteArray(toByteArray(operation.key()));
+                        ByteBuffer operationKey = operation.key();
+                        if (operationKey == null) {
+                            continue;
+                        }
+                        ByteArray opKey = new 
ByteArray(toByteArray(operationKey));
 
                         if (operation.type() == OperationType.PUT && 
opKey.equals(stablePartAssignmentsKey)) {
                             boolean equals = 
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 430246fb27c..2395c0e8353 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -34,6 +34,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
@@ -42,6 +43,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.empty;
@@ -60,6 +62,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -565,7 +568,6 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
      *     <li>We execute "resetPartitions" and expect that data from node 0 
will be available after that.</li>
      * </ul>
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23783";)
     @Test
     @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
     public void testIncompleteRebalanceAfterResetPartitions() throws Exception 
{
@@ -632,6 +634,15 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         assertPendingAssignments(node0, partId, assignmentsPending);
 
+        // need to wait
+        // Need to verify that other nodes managed to switch to the new 
configuration.
+        // Stopping the leader before the group switched to the new 
configuration => the other nodes will never progress as they're
+        // on the old configuration. In out case - The first seen one, [1,4,5].
+        // In other words, need to wait:
+        // [StateMachineAdapter] onConfigurationCommitted: 
idrrt_tirarp_0,idrrt_tirarp_3,idrrt_tirarp_1.
+        List<String> expectedPeers = List.of(node(0).name(), node(1).name(), 
node(3).name());
+        assertConfigurationApplied(node0, partId, expectedPeers);
+
         stopNode(1);
         waitForScale(node0, 3);
 
@@ -809,7 +820,6 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
      * disaster recovery API, but with manual flag set to false. We expect 
that in this replica factor won't be restored.
      * In this test, assignments will be (1, 3, 4), according to {@link 
RendezvousDistributionFunction}.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-23783";)
     @Test
     @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
     void testAutomaticRebalanceIfMajorityIsLost() throws Exception {
@@ -1004,15 +1014,18 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
                 node(6).name());
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
-        Assignments allAssignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(0).name()),
-                Assignment.forLearner(node(1).name()),
-                Assignment.forPeer(node(2).name()),
-                Assignment.forPeer(node(3).name()),
-                Assignment.forPeer(node(4).name()),
-                Assignment.forLearner(node(5).name()),
-                Assignment.forPeer(node(6).name())
-        ), timestamp);
+        CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+        Collection<String> dataNodes = new HashSet<>();
+        for (int i = 0; i < 7; i++) {
+            dataNodes.add(node(i).name());
+        }
+
+        logger().info("Zone {}", zone);
+
+        Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+                dataNodes, partId, zone.partitions(), zone.replicas(), 
zone.consensusGroupSize());
+
+        Assignments allAssignments = Assignments.of(allAssignmentsSet, 
timestamp);
 
         assertStableAssignments(node0, partId, allAssignments);
 
@@ -1153,15 +1166,18 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
                 node(6).name());
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
-        Assignments allAssignments = Assignments.of(Set.of(
-                Assignment.forLearner(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(2).name()),
-                Assignment.forPeer(node(3).name()),
-                Assignment.forPeer(node(4).name()),
-                Assignment.forLearner(node(5).name()),
-                Assignment.forPeer(node(6).name())
-        ), timestamp);
+        CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+        Collection<String> dataNodes = new HashSet<>();
+        for (int i = 0; i < 7; i++) {
+            dataNodes.add(node(i).name());
+        }
+
+        logger().info("Zone {}", zone);
+
+        Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+                dataNodes, partId, zone.partitions(), zone.replicas(), 
zone.consensusGroupSize());
+
+        Assignments allAssignments = Assignments.of(allAssignmentsSet, 
timestamp);
 
         assertStableAssignments(node0, partId, allAssignments);
         // Write data(1) to all seven nodes.
@@ -1452,7 +1468,23 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         }, 10_000));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24160";)
+    private void assertConfigurationApplied(IgniteImpl node0, int partId, 
List<String> peers) {
+        await().atMost(10, SECONDS)
+                .until(() -> {
+                    RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+
+                    TableManager tableManager = 
node0.distributedTableManager();
+
+                    RaftGroupConfiguration raftGroupConfiguration = 
raftGroupConfigurationConverter.fromBytes(
+                            
tableManager.cachedTable(TABLE_NAME).internalTable().storage().getMvPartition(partId)
+                                    .committedGroupConfiguration()
+                    );
+
+                    logger().info("Configuration Peers: {}", 
raftGroupConfiguration.peers());
+                    return peers.containsAll(raftGroupConfiguration.peers());
+                });
+    }
+
     @Test
     @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
     void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception {
@@ -1470,17 +1502,18 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
-        Assignments allAssignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(2).name()),
-                Assignment.forPeer(node(3).name()),
-                Assignment.forPeer(node(4).name()),
-                Assignment.forPeer(node(5).name()),
-                Assignment.forPeer(node(6).name())
-        ), timestamp);
+        CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+        Collection<String> dataNodes = new HashSet<>();
+        for (int i = 0; i < 7; i++) {
+            dataNodes.add(node(i).name());
+        }
 
-        assertStableAssignments(node0, partId, allAssignments);
+        logger().info("Zone {}", zone);
+
+        Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+                dataNodes, partId, zone.partitions(), zone.replicas(), 
zone.consensusGroupSize());
+
+        Assignments allAssignments = Assignments.of(allAssignmentsSet, 
timestamp);
 
         // Assignments chain is equal to the stable assignments.
         assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments));
@@ -1536,7 +1569,6 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertAssignmentsChain(node0, partId, 
AssignmentsChain.of(allAssignments, link2Assignments, link3Assignments));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-25285";)
     @Test
     @ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode = 
ConsistencyMode.HIGH_AVAILABILITY)
     void testSecondResetRewritesUnfinishedFirstPhaseReset() throws Exception {
@@ -1556,15 +1588,18 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
 
-        Assignments allAssignments = Assignments.of(Set.of(
-                Assignment.forPeer(node(0).name()),
-                Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(2).name()),
-                Assignment.forPeer(node(3).name()),
-                Assignment.forPeer(node(4).name()),
-                Assignment.forPeer(node(5).name()),
-                Assignment.forPeer(node(6).name())
-        ), timestamp);
+        CatalogZoneDescriptor zone = 
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+        Collection<String> dataNodes = new HashSet<>();
+        for (int i = 0; i < 7; i++) {
+            dataNodes.add(node(i).name());
+        }
+
+        logger().info("Zone {}", zone);
+
+        Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+                dataNodes, partId, zone.partitions(), zone.replicas(), 
zone.consensusGroupSize());
+
+        Assignments allAssignments = Assignments.of(allAssignmentsSet, 
timestamp);
 
         assertStableAssignments(node0, partId, allAssignments);
 

Reply via email to