This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 52491546348 IGNITE-26337 Fix restart with cleanup tests (#6699)
52491546348 is described below

commit 52491546348d33721bf9239e9615ba7d07f386e1
Author: Mirza Aliev <[email protected]>
AuthorDate: Sat Oct 11 17:34:18 2025 +0400

    IGNITE-26337 Fix restart with cleanup tests (#6699)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   7 +-
 .../restart/ItRestartPartitionsTest.java           |  27 +-
 modules/platforms/cpp/ignite/common/error_codes.h  |   3 +-
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   3 +-
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   7 +-
 .../handler/IgniteInternalExceptionHandler.java    |   3 +-
 ...ControllerRestartPartitionsWithCleanupTest.java |  80 ++-
 .../ItHighAvailablePartitionsRecoveryTest.java     |   1 +
 .../disaster/DisasterRecoveryManager.java          | 657 ++++++++++++---------
 .../disaster/GroupUpdateRequestHandler.java        | 291 ++++-----
 .../disaster/ManualGroupRestartRequest.java        | 134 +++--
 .../disaster/exceptions/IllegalNodesException.java |  30 +
 .../exceptions/NotEnoughAliveNodesException.java   |  31 +
 13 files changed, 738 insertions(+), 536 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d11eed81505..3c897015f38 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -727,8 +727,11 @@ public class ErrorGroups {
         /** Error while returning partition states. */
         public static final int CLUSTER_NOT_IDLE_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 4);
 
-        /** Error while restarting the cluster with clean up. */
-        public static final int RESTART_WITH_CLEAN_UP_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 5);
+        /** Error when not enough alive nodes to perform restart with clean 
up. */
+        public static final int NOT_ENOUGH_ALIVE_NODES_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 5);
+
+        /** Error when node names are not in valid set. */
+        public static final int ILLEGAL_NODES_SET_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 6);
     }
 
     /** Embedded API error group. */
diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
index f795455d85d..5f0e0b32187 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
@@ -46,11 +46,15 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
 
     @BeforeAll
     public void createTables() {
-        sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles 
['%s']", ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
+        sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles 
['%s']",
+                ZONE,
+                initialNodes(),
+                DEFAULT_AIPERSIST_PROFILE_NAME
+        ));
+
         sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val 
INT) ZONE \"%s\"", TABLE_NAME, ZONE));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     @Test
     public void testRestartAllPartitions() {
         execute(CLUSTER_URL_OPTION, NODE_URL,
@@ -165,12 +169,15 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
         ));
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638";)
     public void testRestartAllPartitionsWithCleanup() {
+        String nodeName = CLUSTER.aliveNode().name();
+
         execute(CLUSTER_URL_OPTION, NODE_URL,
                 RECOVERY_TABLE_NAME_OPTION, QUALIFIED_TABLE_NAME,
                 RECOVERY_ZONE_NAME_OPTION, ZONE,
+                RECOVERY_NODE_NAMES_OPTION, nodeName,
                 RECOVERY_WITH_CLEANUP_OPTION
         );
 
@@ -178,13 +185,16 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
         assertOutputContains("Successfully restarted partitions.");
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638";)
     public void testRestartSpecifiedPartitionsWithCleanup() {
+        String nodeName = CLUSTER.aliveNode().name();
+
         execute(CLUSTER_URL_OPTION, NODE_URL,
                 RECOVERY_TABLE_NAME_OPTION, QUALIFIED_TABLE_NAME,
                 RECOVERY_ZONE_NAME_OPTION, ZONE,
                 RECOVERY_PARTITION_IDS_OPTION, "1,2",
+                RECOVERY_NODE_NAMES_OPTION, nodeName,
                 RECOVERY_WITH_CLEANUP_OPTION
         );
 
@@ -192,7 +202,6 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
         assertOutputContains("Successfully restarted partitions.");
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     @Test
     public void testRestartPartitionsByNodesWithCleanup() {
         String nodeNames = CLUSTER.runningNodes()
@@ -208,8 +217,8 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
                 RECOVERY_WITH_CLEANUP_OPTION
         );
 
-        assertErrOutputIsEmpty();
-        assertOutputContains("Successfully restarted partitions.");
+        assertOutputIsEmpty();
+        assertErrOutputContains("Only one node name should be specified for 
the operation.");
     }
 
     @Test
@@ -227,7 +236,7 @@ public abstract class ItRestartPartitionsTest extends 
CliIntegrationTest {
                 RECOVERY_WITH_CLEANUP_OPTION
         );
 
-        assertErrOutputIsEmpty();
-        assertOutputContains("Successfully restarted partitions.");
+        assertOutputIsEmpty();
+        assertErrOutputContains("Only one node name should be specified for 
the operation.");
     }
 }
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 00915319d30..39db509458c 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -220,7 +220,8 @@ enum class code : underlying_t {
     NODES_NOT_FOUND = 0x140002,
     PARTITION_STATE = 0x140003,
     CLUSTER_NOT_IDLE = 0x140004,
-    RESTART_WITH_CLEAN_UP = 0x140005,
+    NOT_ENOUGH_ALIVE_NODES = 0x140005,
+    ILLEGAL_NODES_SET = 0x140006,
 
     // Embedded group. Group code: 21
     CLUSTER_NOT_INITIALIZED = 0x150001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 55d6e90caf8..2456ea01710 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -305,7 +305,8 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::ILLEGAL_PARTITION_ID:
         case error::code::PARTITION_STATE:
         case error::code::CLUSTER_NOT_IDLE:
-        case error::code::RESTART_WITH_CLEAN_UP:
+        case error::code::NOT_ENOUGH_ALIVE_NODES:
+        case error::code::ILLEGAL_NODES_SET:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Embedded group. Group code: 21
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index e10b6464a8e..b3c2120a7c9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -706,8 +706,11 @@ namespace Apache.Ignite
             /// <summary> ClusterNotIdle error. </summary>
             public const int ClusterNotIdle = (GroupCode << 16) | (4 & 0xFFFF);
 
-            /// <summary> RestartWithCleanUp error. </summary>
-            public const int RestartWithCleanUp = (GroupCode << 16) | (5 & 
0xFFFF);
+            /// <summary> NotEnoughAliveNodes error. </summary>
+            public const int NotEnoughAliveNodes = (GroupCode << 16) | (5 & 
0xFFFF);
+
+            /// <summary> IllegalNodesSet error. </summary>
+            public const int IllegalNodesSet = (GroupCode << 16) | (6 & 
0xFFFF);
         }
 
         /// <summary> Embedded errors. </summary>
diff --git 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 576acb3937b..5739c244e64 100644
--- 
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++ 
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -40,7 +40,8 @@ public class IgniteInternalExceptionHandler implements 
ExceptionHandler<IgniteIn
     private static final Set<Integer> BAD_REQUEST_CODES = Set.of(
             DistributionZones.ZONE_NOT_FOUND_ERR,
             DisasterRecovery.ILLEGAL_PARTITION_ID_ERR,
-            DisasterRecovery.NODES_NOT_FOUND_ERR
+            DisasterRecovery.NODES_NOT_FOUND_ERR,
+            DisasterRecovery.ILLEGAL_NODES_SET_ERR
     );
 
     @Override
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
index ac93564d27b..14943569d83 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
@@ -70,8 +70,13 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
     HttpClient client;
 
     @BeforeAll
-    public void setUp() {
-        sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles 
['%s']", FIRST_ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
+    public void setUp() throws InterruptedException {
+        sql(String.format(
+                "CREATE ZONE \"%s\" (REPLICAS %s) storage profiles ['%s']",
+                FIRST_ZONE,
+                initialNodes(),
+                DEFAULT_AIPERSIST_PROFILE_NAME
+        ));
         sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val 
INT) ZONE \"%s\"", TABLE_NAME,
                 FIRST_ZONE));
 
@@ -84,8 +89,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
 
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
unknownZone, QUALIFIED_TABLE_NAME, Set.of());
 
-        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
-                () -> client.toBlocking().exchange(post));
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
         assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
 
@@ -100,8 +104,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
 
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, tableName, Set.of());
 
-        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
-                () -> client.toBlocking().exchange(post));
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
         assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
         assertThat(e.getMessage(), containsString("The table does not exist 
[name=" + tableName.toUpperCase() + "]"));
@@ -111,8 +114,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
     void testRestartPartitionsWithCleanupIllegalPartitionNegative() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10));
 
-        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
-                () -> client.toBlocking().exchange(post));
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
         assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
 
@@ -123,8 +125,7 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
     void testRestartPartitionsWithCleanupPartitionsOutOfRange() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT));
 
-        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
-                () -> client.toBlocking().exchange(post));
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
         assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
         assertThat(e.getMessage(), containsString(
@@ -139,33 +140,31 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
 
     @Test
     void testRestartPartitionsWithCleanupNodesAreCaseSensitive() {
-        Set<String> uppercaseNodeNames = nodeNames(initialNodes() - 1).stream()
-                .map(String::toUpperCase)
-                .collect(toSet());
+        Set<String> uppercaseNodeName = 
Set.of(CLUSTER.aliveNode().name().toUpperCase());
 
-        MutableHttpRequest<?> post = 
restartPartitionsRequest(uppercaseNodeNames, FIRST_ZONE, QUALIFIED_TABLE_NAME, 
Set.of());
+        MutableHttpRequest<?> post = 
restartPartitionsRequest(uppercaseNodeName, FIRST_ZONE, QUALIFIED_TABLE_NAME, 
Set.of());
 
-        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class,
-                () -> client.toBlocking().exchange(post));
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
         assertThat(e.getStatus(), equalTo(HttpStatus.BAD_REQUEST));
-        uppercaseNodeNames.forEach(nodeName -> assertThat(e.getMessage(), 
containsString(nodeName)));
+        uppercaseNodeName.forEach(nodeName -> assertThat(e.getMessage(), 
containsString(nodeName)));
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
-    public void testRestartAllPartitionsWithCleanup() {
+    public void testPartitionsWithCleanupEmptySetOfNodes() {
         MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
 
-        HttpResponse<Void> response = client.toBlocking().exchange(post);
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
-        assertThat(response.getStatus().getCode(), is(OK.code()));
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+        assertThat(e.getMessage(), containsString("Only one node name should 
be specified for the operation."));
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     public void testRestartSpecifiedPartitionsWithCleanup() {
-        MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(), 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
+        Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+        MutableHttpRequest<?> post = restartPartitionsRequest(nodeName, 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
 
         HttpResponse<Void> response = client.toBlocking().exchange(post);
 
@@ -173,25 +172,50 @@ public class 
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     public void testRestartPartitionsWithCleanupByNodes() {
-        Set<String> nodeNames = nodeNames(initialNodes() - 1);
+        Set<String> nodeNames = nodeNames(initialNodes());
 
         MutableHttpRequest<?> post = restartPartitionsRequest(nodeNames, 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
 
-        HttpResponse<Void> response = client.toBlocking().exchange(post);
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
 
-        assertThat(response.getStatus().getCode(), is(OK.code()));
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+        assertThat(e.getMessage(), containsString("Only one node name should 
be specified for the operation."));
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337";)
     public void testRestartTablePartitionsWithCleanupByNodes() {
         Set<String> nodeNames = nodeNames(initialNodes() - 1);
 
         MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
                 new RestartPartitionsRequest(nodeNames, FIRST_ZONE, 
QUALIFIED_TABLE_NAME, Set.of()));
 
+        HttpClientResponseException e = 
assertThrows(HttpClientResponseException.class, () -> 
client.toBlocking().exchange(post));
+
+        assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+        assertThat(e.getMessage(), containsString("Only one node name should 
be specified for the operation."));
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638";)
+    public void testRestartPartitionsWithCleanupAllPartitions() {
+        Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+        MutableHttpRequest<?> post = restartPartitionsRequest(nodeName, 
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
+
+        HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+        assertThat(response.getStatus().getCode(), is(OK.code()));
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638";)
+    public void testRestartTablePartitionsWithCleanupAllPartitions() {
+        Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+        MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+                new RestartPartitionsRequest(nodeName, FIRST_ZONE, 
QUALIFIED_TABLE_NAME, Set.of()));
+
         HttpResponse<Void> response = client.toBlocking().exchange(post);
 
         assertThat(response.getStatus().getCode(), is(OK.code()));
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index 234ed90c073..71d4ceefb30 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -320,6 +320,7 @@ public class ItHighAvailablePartitionsRecoveryTest extends 
AbstractHighAvailable
     }
 
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26657";)
     void testRebalanceInHaZone() throws InterruptedException {
         createHaZoneWithTable();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index a8e8c1dfa69..cf18db038f0 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -46,6 +46,8 @@ import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
 
 import java.util.ArrayList;
@@ -123,10 +125,13 @@ import 
org.apache.ignite.internal.systemview.api.SystemViewProvider;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalNodesException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
 import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.versioned.VersionedSerialization;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.raft.jraft.RaftGroupService;
@@ -168,6 +173,9 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      */
     private static final int CATCH_UP_THRESHOLD = 100;
 
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /** Thread pool executor for async parts. */
     private final ExecutorService threadPool;
 
@@ -246,39 +254,43 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         this.nodeProperties = nodeProperties;
         this.systemViewManager = systemViewManager;
 
-        watchListener = event -> {
+        watchListener = event -> inBusyLock(busyLock, () -> {
             handleTriggerKeyUpdate(event);
 
             // There is no need to block a watch thread any longer.
             return nullCompletedFuture();
-        };
+        });
     }
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        systemViewManager.register(this);
+        return inBusyLock(busyLock, () -> {
+            systemViewManager.register(this);
 
-        
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
+            
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
 
-        metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, 
watchListener);
+            metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, 
watchListener);
 
-        if (!nodeProperties.colocationEnabled()) {
-            dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, 
this::onHaZoneTablePartitionTopologyReduce);
-        } else {
-            dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, 
this::onHaZonePartitionTopologyReduce);
-        }
+            if (!nodeProperties.colocationEnabled()) {
+                dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, 
this::onHaZoneTablePartitionTopologyReduce);
+            } else {
+                dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED, 
this::onHaZonePartitionTopologyReduce);
+            }
 
-        catalogManager.listen(TABLE_CREATE, fromConsumer(this::onTableCreate));
+            catalogManager.listen(TABLE_CREATE, 
fromConsumer(this::onTableCreate));
 
-        catalogManager.listen(TABLE_DROP, fromConsumer(this::onTableDrop));
+            catalogManager.listen(TABLE_DROP, fromConsumer(this::onTableDrop));
 
-        registerMetricSources();
+            registerMetricSources();
 
-        return nullCompletedFuture();
+            return nullCompletedFuture();
+        });
     }
 
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        busyLock.block();
+
         metaStorageManager.unregisterWatch(watchListener);
 
         for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
@@ -310,60 +322,70 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         return ongoingOperationsById;
     }
 
-    private CompletableFuture<Boolean> 
onHaZoneTablePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
-        int zoneId = params.zoneId();
-        long revision = params.causalityToken();
-        long timestamp = 
metaStorageManager.timestampByRevisionLocally(revision).longValue();
-
-        Catalog catalog = catalogManager.activeCatalog(timestamp);
-        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+    public IgniteSpinBusyLock busyLock() {
+        return busyLock;
+    }
 
-        Map<Integer, Set<Integer>> tablePartitionsToReset = new HashMap<>();
-        for (CatalogTableDescriptor table : catalog.tables(zoneId)) {
-            Set<Integer> partitionsToReset = new HashSet<>();
-            for (int partId = 0; partId < zoneDescriptor.partitions(); 
partId++) {
-                TablePartitionId partitionId = new 
TablePartitionId(table.id(), partId);
+    private CompletableFuture<Boolean> 
onHaZoneTablePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
+        return inBusyLock(busyLock, () -> {
+            int zoneId = params.zoneId();
+            long revision = params.causalityToken();
+            long timestamp = 
metaStorageManager.timestampByRevisionLocally(revision).longValue();
+
+            Catalog catalog = catalogManager.activeCatalog(timestamp);
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+            Map<Integer, Set<Integer>> tablePartitionsToReset = new 
HashMap<>();
+            for (CatalogTableDescriptor table : catalog.tables(zoneId)) {
+                Set<Integer> partitionsToReset = new HashSet<>();
+                for (int partId = 0; partId < zoneDescriptor.partitions(); 
partId++) {
+                    TablePartitionId partitionId = new 
TablePartitionId(table.id(), partId);
+
+                    if (stableAssignmentsWithOnlyAliveNodes(partitionId, 
revision, false).size() < calculateQuorum(
+                            zoneDescriptor.replicas())) {
+                        partitionsToReset.add(partId);
+                    }
+                }
 
-                if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision, 
false).size() < calculateQuorum(zoneDescriptor.replicas())) {
-                    partitionsToReset.add(partId);
+                if (!partitionsToReset.isEmpty()) {
+                    tablePartitionsToReset.put(table.id(), partitionsToReset);
                 }
             }
 
-            if (!partitionsToReset.isEmpty()) {
-                tablePartitionsToReset.put(table.id(), partitionsToReset);
+            if (!tablePartitionsToReset.isEmpty()) {
+                return resetPartitions(zoneDescriptor.name(), 
tablePartitionsToReset, false, revision, false).thenApply(r -> false);
+            } else {
+                return falseCompletedFuture();
             }
-        }
-
-        if (!tablePartitionsToReset.isEmpty()) {
-            return resetPartitions(zoneDescriptor.name(), 
tablePartitionsToReset, false, revision, false).thenApply(r -> false);
-        } else {
-            return falseCompletedFuture();
-        }
+        });
     }
 
     private CompletableFuture<Boolean> 
onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
-        int zoneId = params.zoneId();
-        long revision = params.causalityToken();
-        long timestamp = 
metaStorageManager.timestampByRevisionLocally(revision).longValue();
+        return inBusyLock(busyLock, () -> {
+            int zoneId = params.zoneId();
+            long revision = params.causalityToken();
+            long timestamp = 
metaStorageManager.timestampByRevisionLocally(revision).longValue();
 
-        Catalog catalog = catalogManager.activeCatalog(timestamp);
-        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+            Catalog catalog = catalogManager.activeCatalog(timestamp);
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
 
-        Set<Integer> partitionsToReset = new HashSet<>();
+            Set<Integer> partitionsToReset = new HashSet<>();
 
-        for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
-            ZonePartitionId partitionId = new ZonePartitionId(zoneId, partId);
+            for (int partId = 0; partId < zoneDescriptor.partitions(); 
partId++) {
+                ZonePartitionId partitionId = new ZonePartitionId(zoneId, 
partId);
 
-            if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision, 
true).size() < calculateQuorum(zoneDescriptor.replicas())) {
-                partitionsToReset.add(partId);
+                if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision, 
true).size() < calculateQuorum(zoneDescriptor.replicas())) {
+                    partitionsToReset.add(partId);
+                }
             }
-        }
 
-        if (!partitionsToReset.isEmpty()) {
-            return resetPartitions(zoneDescriptor.name(), Map.of(zoneId, 
partitionsToReset), false, revision, true).thenApply(r -> false);
-        } else {
-            return falseCompletedFuture();
-        }
+            if (!partitionsToReset.isEmpty()) {
+                return resetPartitions(zoneDescriptor.name(), Map.of(zoneId, 
partitionsToReset), false, revision, true).thenApply(
+                        r -> false);
+            } else {
+                return falseCompletedFuture();
+            }
+        });
     }
 
     private Set<Assignment> 
stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long 
revision, boolean colocationEnabled) {
@@ -403,9 +425,11 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      * @return Future that completes when partitions are reset.
      */
     public CompletableFuture<Void> resetTablePartitions(String zoneName, 
String schemaName, String tableName, Set<Integer> partitionIds) {
-        int tableId = tableDescriptor(catalogLatestVersion(), schemaName, 
tableName).id();
+        return inBusyLock(busyLock, () -> {
+            int tableId = tableDescriptor(catalogLatestVersion(), schemaName, 
tableName).id();
 
-        return resetPartitions(zoneName, Map.of(tableId, partitionIds), true, 
-1, false);
+            return resetPartitions(zoneName, Map.of(tableId, partitionIds), 
true, -1, false);
+        });
     }
 
     /**
@@ -430,9 +454,11 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             boolean manualUpdate,
             long triggerRevision
     ) {
-        int tableId = tableDescriptor(catalogLatestVersion(), schemaName, 
tableName).id();
+        return inBusyLock(busyLock, () -> {
+            int tableId = tableDescriptor(catalogLatestVersion(), schemaName, 
tableName).id();
 
-        return resetPartitions(zoneName, Map.of(tableId, partitionIds), 
manualUpdate, triggerRevision, false);
+            return resetPartitions(zoneName, Map.of(tableId, partitionIds), 
manualUpdate, triggerRevision, false);
+        });
     }
 
     /**
@@ -446,9 +472,11 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      * @return Future that completes when partitions are reset.
      */
     public CompletableFuture<Void> resetPartitions(String zoneName, 
Set<Integer> partitionIds) {
-        int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
+        return inBusyLock(busyLock, () -> {
+            int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
 
-        return resetPartitions(zoneName, Map.of(zoneId, partitionIds), true, 
-1, true);
+            return resetPartitions(zoneName, Map.of(zoneId, partitionIds), 
true, -1, true);
+        });
     }
 
     /**
@@ -469,9 +497,11 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             boolean manualUpdate,
             long triggerRevision
     ) {
-        int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
+        return inBusyLock(busyLock, () -> {
+            int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
 
-        return resetPartitions(zoneName, Map.of(zoneId, partitionIds), 
manualUpdate, triggerRevision, true);
+            return resetPartitions(zoneName, Map.of(zoneId, partitionIds), 
manualUpdate, triggerRevision, true);
+        });
     }
 
     /**
@@ -494,27 +524,29 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             long triggerRevision,
             boolean colocationEnabled
     ) {
-        try {
-            Catalog catalog = catalogLatestVersion();
-
-            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
-            partitionIds.values().forEach(ids -> checkPartitionsRange(ids, 
Set.of(zone)));
-
-            return processNewRequest(
-                    GroupUpdateRequest.create(
-                            UUID.randomUUID(),
-                            catalog.version(),
-                            zone.id(),
-                            partitionIds,
-                            manualUpdate,
-                            colocationEnabled
-                    ),
-                    triggerRevision
-            );
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                Catalog catalog = catalogLatestVersion();
+
+                CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+                partitionIds.values().forEach(ids -> checkPartitionsRange(ids, 
Set.of(zone)));
+
+                return processNewRequest(
+                        GroupUpdateRequest.create(
+                                UUID.randomUUID(),
+                                catalog.version(),
+                                zone.id(),
+                                partitionIds,
+                                manualUpdate,
+                                colocationEnabled
+                        ),
+                        triggerRevision
+                );
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
@@ -534,36 +566,38 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             String tableName,
             Set<Integer> partitionIds
     ) {
-        try {
-            // Validates passed node names.
-            getNodes(nodeNames);
+        return inBusyLock(busyLock, () -> {
+            try {
+                // Validates passed node names.
+                getNodes(nodeNames);
 
-            Catalog catalog = catalogLatestVersion();
+                Catalog catalog = catalogLatestVersion();
 
-            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+                CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
 
-            CatalogTableDescriptor table = tableDescriptor(catalog, 
schemaName, tableName);
+                CatalogTableDescriptor table = tableDescriptor(catalog, 
schemaName, tableName);
 
-            checkPartitionsRange(partitionIds, Set.of(zone));
+                checkPartitionsRange(partitionIds, Set.of(zone));
 
-            return processNewRequest(new ManualGroupRestartRequest(
-                    UUID.randomUUID(),
-                    zone.id(),
-                    table.id(),
-                    partitionIds,
-                    nodeNames,
-                    catalog.time(),
-                    false
-            ));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+                return processNewRequest(new ManualGroupRestartRequest(
+                        UUID.randomUUID(),
+                        zone.id(),
+                        table.id(),
+                        partitionIds,
+                        nodeNames,
+                        catalog.time(),
+                        false
+                ));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
      * Restarts replica service and raft group of passed partitions with 
cleaning up partition storages.
      *
-     * @param nodeNames Names specifying nodes to restart partitions. 
Case-sensitive, empty set means "all nodes".
+     * @param nodeNames Names specifying nodes to restart partitions. Only one 
node is allowed.
      * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
      * @param schemaName Schema name. Case-sensitive, without quotes.
      * @param tableName Table name. Case-sensitive, without quotes.
@@ -577,30 +611,34 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             String tableName,
             Set<Integer> partitionIds
     ) {
-        try {
-            // Validates passed node names.
-            getNodes(nodeNames);
+        return inBusyLock(busyLock, () -> {
+            try {
+                // Validates passed node names.
+                getNodes(nodeNames);
 
-            Catalog catalog = catalogLatestVersion();
+                Catalog catalog = catalogLatestVersion();
 
-            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+                CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
 
-            CatalogTableDescriptor table = tableDescriptor(catalog, 
schemaName, tableName);
+                CatalogTableDescriptor table = tableDescriptor(catalog, 
schemaName, tableName);
 
-            checkPartitionsRange(partitionIds, Set.of(zone));
+                checkPartitionsRange(partitionIds, Set.of(zone));
 
-            return processNewRequest(new ManualGroupRestartRequest(
-                    UUID.randomUUID(),
-                    zone.id(),
-                    table.id(),
-                    partitionIds,
-                    nodeNames,
-                    catalog.time(),
-                    true
-            ));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+                checkOnlyOneNodeSpecified(nodeNames);
+
+                return processNewRequest(new ManualGroupRestartRequest(
+                        UUID.randomUUID(),
+                        zone.id(),
+                        table.id(),
+                        partitionIds,
+                        nodeNames,
+                        catalog.time(),
+                        true
+                ));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
@@ -616,37 +654,39 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             String zoneName,
             Set<Integer> partitionIds
     ) {
-        try {
-            // Validates passed node names.
-            getNodes(nodeNames);
-
-            Catalog catalog = catalogLatestVersion();
-
-            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
-            checkPartitionsRange(partitionIds, Set.of(zone));
-
-            return processNewRequest(new ManualGroupRestartRequest(
-                    UUID.randomUUID(),
-                    zone.id(),
-                    // We pass here -1 as table id because it is not used for 
zone-based partitions.
-                    // We expect that the field will be removed once 
colocation track is finished.
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
-                    -1,
-                    partitionIds,
-                    nodeNames,
-                    catalog.time(),
-                    false
-            ));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                // Validates passed node names.
+                getNodes(nodeNames);
+
+                Catalog catalog = catalogLatestVersion();
+
+                CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+                checkPartitionsRange(partitionIds, Set.of(zone));
+
+                return processNewRequest(new ManualGroupRestartRequest(
+                        UUID.randomUUID(),
+                        zone.id(),
+                        // We pass here -1 as table id because it is not used 
for zone-based partitions.
+                        // We expect that the field will be removed once 
colocation track is finished.
+                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-22522
+                        -1,
+                        partitionIds,
+                        nodeNames,
+                        catalog.time(),
+                        false
+                ));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
      * Restart partitions of a zone with cleanup. This method destroys 
partition storage during restart.
      *
-     * @param nodeNames Names of nodes to restart partitions on. If empty, 
restart on all nodes.
+     * @param nodeNames Names of nodes to restart partitions on. Only one node 
is allowed.
      * @param zoneName Zone name. Case-sensitive, without quotes.
      * @param partitionIds IDs of partitions to restart. If empty, restart all 
zone's partitions.
      * @return Future that completes when partitions are restarted.
@@ -656,31 +696,35 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             String zoneName,
             Set<Integer> partitionIds
     ) {
-        try {
-            // Validates passed node names.
-            getNodes(nodeNames);
-
-            Catalog catalog = catalogLatestVersion();
-
-            CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
-            checkPartitionsRange(partitionIds, Set.of(zone));
-
-            return processNewRequest(new ManualGroupRestartRequest(
-                    UUID.randomUUID(),
-                    zone.id(),
-                    // We pass here -1 as table id because it is not used for 
zone-based partitions.
-                    // We expect that the field will be removed once 
colocation track is finished.
-                    // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
-                    -1,
-                    partitionIds,
-                    nodeNames,
-                    catalog.time(),
-                    true
-            ));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                // Validates passed node names.
+                getNodes(nodeNames);
+
+                Catalog catalog = catalogLatestVersion();
+
+                CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+                checkPartitionsRange(partitionIds, Set.of(zone));
+
+                checkOnlyOneNodeSpecified(nodeNames);
+
+                return processNewRequest(new ManualGroupRestartRequest(
+                        UUID.randomUUID(),
+                        zone.id(),
+                        // We pass here -1 as table id because it is not used 
for zone-based partitions.
+                        // We expect that the field will be removed once 
colocation track is finished.
+                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-22522
+                        -1,
+                        partitionIds,
+                        nodeNames,
+                        catalog.time(),
+                        true
+                ));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
@@ -697,21 +741,23 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             Set<String> nodeNames,
             Set<Integer> partitionIds
     ) {
-        try {
-            assert nodeProperties.colocationEnabled() : "Zone based 
replication is unavailable use localTablePartitionStates";
-
-            Catalog catalog = catalogLatestVersion();
-
-            return localPartitionStatesInternal(
-                    zoneNames,
-                    nodeNames,
-                    partitionIds,
-                    catalog,
-                    zoneState()
-            ).thenApply(res -> normalizeLocal(res, catalog));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                assert nodeProperties.colocationEnabled() : "Zone based 
replication is unavailable use localTablePartitionStates";
+
+                Catalog catalog = catalogLatestVersion();
+
+                return localPartitionStatesInternal(
+                        zoneNames,
+                        nodeNames,
+                        partitionIds,
+                        catalog,
+                        zoneState()
+                ).thenApply(res -> normalizeLocal(res, catalog));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     /**
@@ -725,23 +771,25 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             Set<String> zoneNames,
             Set<Integer> partitionIds
     ) {
-        try {
-            assert nodeProperties.colocationEnabled() : "Zone based 
replication is unavailable use globalTablePartitionStates";
-
-            Catalog catalog = catalogLatestVersion();
-
-            return localPartitionStatesInternal(
-                    zoneNames,
-                    Set.of(),
-                    partitionIds,
-                    catalog,
-                    zoneState()
-            )
-                    .thenApply(res -> normalizeLocal(res, catalog))
-                    .thenApply(res -> assembleGlobal(res, partitionIds, 
catalog));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                assert nodeProperties.colocationEnabled() : "Zone based 
replication is unavailable use globalTablePartitionStates";
+
+                Catalog catalog = catalogLatestVersion();
+
+                return localPartitionStatesInternal(
+                        zoneNames,
+                        Set.of(),
+                        partitionIds,
+                        catalog,
+                        zoneState()
+                )
+                        .thenApply(res -> normalizeLocal(res, catalog))
+                        .thenApply(res -> assembleGlobal(res, partitionIds, 
catalog));
+            } catch (Throwable t) {
+                return failedFuture(t);
+            }
+        });
     }
 
     static Function<LocalPartitionStateMessage, ZonePartitionId> zoneState() {
@@ -755,56 +803,60 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             Catalog catalog,
             Function<LocalPartitionStateMessage, T> keyExtractor
     ) {
-        Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames, 
catalog.zones());
+        return inBusyLock(busyLock, () -> {
+            Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames, 
catalog.zones());
 
-        checkPartitionsRange(partitionIds, zones);
+            checkPartitionsRange(partitionIds, zones);
 
-        Set<NodeWithAttributes> nodes = getNodes(nodeNames);
+            Set<NodeWithAttributes> nodes = getNodes(nodeNames);
 
-        Set<Integer> zoneIds = 
zones.stream().map(CatalogObjectDescriptor::id).collect(toSet());
+            Set<Integer> zoneIds = 
zones.stream().map(CatalogObjectDescriptor::id).collect(toSet());
 
-        LocalPartitionStatesRequest localPartitionStatesRequest = 
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest()
-                .zoneIds(zoneIds)
-                .partitionIds(partitionIds)
-                .catalogVersion(catalog.version())
-                .build();
+            LocalPartitionStatesRequest localPartitionStatesRequest = 
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest()
+                    .zoneIds(zoneIds)
+                    .partitionIds(partitionIds)
+                    .catalogVersion(catalog.version())
+                    .build();
 
-        Map<T, LocalPartitionStateMessageByNode> result = new 
ConcurrentHashMap<>();
-        CompletableFuture<?>[] futures = new CompletableFuture[nodes.size()];
+            Map<T, LocalPartitionStateMessageByNode> result = new 
ConcurrentHashMap<>();
+            CompletableFuture<?>[] futures = new 
CompletableFuture[nodes.size()];
 
-        int i = 0;
-        for (NodeWithAttributes node : nodes) {
-            CompletableFuture<NetworkMessage> invokeFuture = 
messagingService.invoke(
-                    node.nodeName(),
-                    localPartitionStatesRequest,
-                    TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
-            );
+            int i = 0;
+            for (NodeWithAttributes node : nodes) {
+                CompletableFuture<NetworkMessage> invokeFuture = 
messagingService.invoke(
+                        node.nodeName(),
+                        localPartitionStatesRequest,
+                        TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
+                );
 
-            futures[i++] = invokeFuture.thenAccept(networkMessage -> {
-                assert networkMessage instanceof LocalPartitionStatesResponse 
: networkMessage;
+                futures[i++] = invokeFuture.thenAccept(networkMessage -> {
+                    inBusyLock(busyLock, () -> {
+                        assert networkMessage instanceof 
LocalPartitionStatesResponse : networkMessage;
 
-                var response = (LocalPartitionStatesResponse) networkMessage;
+                        var response = (LocalPartitionStatesResponse) 
networkMessage;
 
-                for (LocalPartitionStateMessage state : response.states()) {
-                    result.compute(keyExtractor.apply(state), (partitionId, 
messageByNode) -> {
-                        if (messageByNode == null) {
-                            return new 
LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
-                        }
+                        for (LocalPartitionStateMessage state : 
response.states()) {
+                            result.compute(keyExtractor.apply(state), 
(partitionId, messageByNode) -> {
+                                if (messageByNode == null) {
+                                    return new 
LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
+                                }
 
-                        messageByNode = new 
LocalPartitionStateMessageByNode(messageByNode);
-                        messageByNode.put(node.nodeName(), state);
-                        return messageByNode;
+                                messageByNode = new 
LocalPartitionStateMessageByNode(messageByNode);
+                                messageByNode.put(node.nodeName(), state);
+                                return messageByNode;
+                            });
+                        }
                     });
-                }
-            });
-        }
-
-        return allOf(futures).handle((unused, err) -> {
-            if (err != null) {
-                throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
+                });
             }
 
-            return result;
+            return allOf(futures).handle((unused, err) -> {
+                if (err != null) {
+                    throw new DisasterRecoveryException(PARTITION_STATE_ERR, 
err);
+                }
+
+                return result;
+            });
         });
     }
 
@@ -822,34 +874,36 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             Set<String> nodeNames,
             Set<Integer> partitionIds
     ) {
-        try {
-            Catalog catalog = catalogLatestVersion();
+        return inBusyLock(busyLock, () -> {
+            try {
+                Catalog catalog = catalogLatestVersion();
+
+                if (nodeProperties.colocationEnabled()) {
+                    return localPartitionStatesInternal(
+                            zoneNames,
+                            nodeNames,
+                            partitionIds,
+                            catalog,
+                            zoneState()
+                    )
+                            .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
+                                    .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
+                            )
+                            .thenApply(res -> normalizeTableLocal(res, 
catalog));
+                }
 
-            if (nodeProperties.colocationEnabled()) {
                 return localPartitionStatesInternal(
                         zoneNames,
                         nodeNames,
                         partitionIds,
                         catalog,
-                        zoneState()
+                        tableState()
                 )
-                        .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
-                                .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
-                        )
                         .thenApply(res -> normalizeTableLocal(res, catalog));
+            } catch (Throwable t) {
+                return failedFuture(t);
             }
-
-            return localPartitionStatesInternal(
-                    zoneNames,
-                    nodeNames,
-                    partitionIds,
-                    catalog,
-                    tableState()
-            )
-                    .thenApply(res -> normalizeTableLocal(res, catalog));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        });
     }
 
     /**
@@ -863,36 +917,38 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             Set<String> zoneNames,
             Set<Integer> partitionIds
     ) {
-        try {
-            Catalog catalog = catalogLatestVersion();
+        return inBusyLock(busyLock, () -> {
+            try {
+                Catalog catalog = catalogLatestVersion();
+
+                if (nodeProperties.colocationEnabled()) {
+                    return localPartitionStatesInternal(
+                            zoneNames,
+                            Set.of(),
+                            partitionIds,
+                            catalog,
+                            zoneState()
+                    )
+                            .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
+                                    .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
+                            )
+                            .thenApply(res -> normalizeTableLocal(res, 
catalog))
+                            .thenApply(res -> assembleTableGlobal(res, 
partitionIds, catalog));
+                }
 
-            if (nodeProperties.colocationEnabled()) {
                 return localPartitionStatesInternal(
                         zoneNames,
                         Set.of(),
                         partitionIds,
                         catalog,
-                        zoneState()
+                        tableState()
                 )
-                        .thenCompose(res -> 
tableStateForZone(toZonesOnNodes(res), catalog.version())
-                                .thenApply(tableState -> 
zoneStateToTableState(res, tableState, catalog))
-                        )
                         .thenApply(res -> normalizeTableLocal(res, catalog))
                         .thenApply(res -> assembleTableGlobal(res, 
partitionIds, catalog));
+            } catch (Throwable t) {
+                return failedFuture(t);
             }
-
-            return localPartitionStatesInternal(
-                    zoneNames,
-                    Set.of(),
-                    partitionIds,
-                    catalog,
-                    tableState()
-            )
-                    .thenApply(res -> normalizeTableLocal(res, catalog))
-                    .thenApply(res -> assembleTableGlobal(res, partitionIds, 
catalog));
-        } catch (Throwable t) {
-            return failedFuture(t);
-        }
+        });
     }
 
     /**
@@ -1064,6 +1120,12 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         });
     }
 
+    private static void checkOnlyOneNodeSpecified(Set<String> nodeNames) {
+        if (nodeNames.size() != 1) {
+            throw new IllegalNodesException();
+        }
+    }
+
     private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws 
NodesNotFoundException {
         if (nodeNames.isEmpty()) {
             return dzManager.logicalTopology();
@@ -1212,19 +1274,34 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                     return;
                 }
 
-                request.handle(this, watchEvent.revision(), 
watchEvent.timestamp()).whenComplete(copyStateTo(operationFuture));
+                request.handle(this, watchEvent.revision(), 
watchEvent.timestamp())
+                        .handle((res, ex) -> inBusyLock(busyLock, () -> {
+                            copyStateTo(operationFuture).accept(res, ex);
+
+                            if (ex != null) {
+                                if (!hasCause(ex, 
NodeStoppingException.class)) {
+                                    failureManager.process(new 
FailureContext(ex, "Unable to handle disaster recovery request."));
+                                }
+                            }
 
+                            return null;
+                        }));
                 break;
             case MULTI_NODE:
-                CompletableFuture<Void> handleFuture = request.handle(this, 
watchEvent.revision(), watchEvent.timestamp());
-
-                if (operationFuture == null) {
-                    // We're not the initiator, or timeout has passed.
-                    return;
-                }
-
-                handleFuture.whenComplete(copyStateTo(operationFuture));
-
+                request.handle(this, watchEvent.revision(), 
watchEvent.timestamp())
+                        .handle((res, ex) -> inBusyLock(busyLock, () -> {
+                            if (operationFuture != null) {
+                                copyStateTo(operationFuture).accept(res, ex);
+                            }
+
+                            if (ex != null) {
+                                if (!hasCause(ex, NodeStoppingException.class, 
NotEnoughAliveNodesException.class)) {
+                                    failureManager.process(new 
FailureContext(ex, "Unable to handle disaster recovery request."));
+                                }
+                            }
+
+                            return null;
+                        }));
                 break;
             default:
                 var error = new AssertionError("Unexpected request type: " + 
request.getClass());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 0e2e59a9dbd..213d39f4023 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -42,6 +42,7 @@ import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR;
 
 import java.util.ArrayList;
@@ -100,64 +101,67 @@ abstract class GroupUpdateRequestHandler<T extends 
PartitionGroupId> {
     }
 
     public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
-        int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            int catalogVersion = 
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
 
-        if (request.catalogVersion() != catalogVersion) {
-            return failedFuture(
-                    new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR, 
"Cluster is not idle, concurrent DDL update detected.")
-            );
-        }
+            if (request.catalogVersion() != catalogVersion) {
+                return failedFuture(
+                        new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR, 
"Cluster is not idle, concurrent DDL update detected.")
+                );
+            }
 
-        Catalog catalog = 
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
+            Catalog catalog = 
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
 
-        int zoneId = request.zoneId();
+            int zoneId = request.zoneId();
 
-        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
 
-        Set<Integer> allZonePartitionsToReset = new HashSet<>();
-        
request.partitionIds().values().forEach(allZonePartitionsToReset::addAll);
+            Set<Integer> allZonePartitionsToReset = new HashSet<>();
+            
request.partitionIds().values().forEach(allZonePartitionsToReset::addAll);
 
-        CompletableFuture<Set<String>> dataNodesFuture =
-                disasterRecoveryManager.dzManager.dataNodes(msTimestamp, 
catalogVersion, zoneId);
+            CompletableFuture<Set<String>> dataNodesFuture =
+                    disasterRecoveryManager.dzManager.dataNodes(msTimestamp, 
catalogVersion, zoneId);
 
-        CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> 
localStatesFuture =
-                localStatesFuture(disasterRecoveryManager, 
Set.of(zoneDescriptor.name()), allZonePartitionsToReset, catalog);
+            CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> 
localStatesFuture =
+                    localStatesFuture(disasterRecoveryManager, 
Set.of(zoneDescriptor.name()), allZonePartitionsToReset, catalog);
 
-        return dataNodesFuture.thenCombine(localStatesFuture, (dataNodes, 
localStatesMap) -> {
-            Set<String> nodeConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
-                    .stream()
-                    .map(NodeWithAttributes::nodeName)
-                    .collect(toSet());
+            return dataNodesFuture.thenCombine(localStatesFuture, (dataNodes, 
localStatesMap) -> {
+                return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+                    Set<String> nodeConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
+                            .stream()
+                            .map(NodeWithAttributes::nodeName)
+                            .collect(toSet());
 
-            List<CompletableFuture<Void>> assignmentsUpdateFuts = new 
ArrayList<>(request.partitionIds().size());
+                    List<CompletableFuture<Void>> assignmentsUpdateFuts = new 
ArrayList<>(request.partitionIds().size());
 
-            for (Entry<Integer, Set<Integer>> partitionEntry : 
request.partitionIds().entrySet()) {
+                    for (Entry<Integer, Set<Integer>> partitionEntry : 
request.partitionIds().entrySet()) {
 
-                int[] partitionIdsArray = 
AssignmentUtil.partitionIds(partitionEntry.getValue(), 
zoneDescriptor.partitions());
+                        int[] partitionIdsArray = 
AssignmentUtil.partitionIds(partitionEntry.getValue(), 
zoneDescriptor.partitions());
 
-                assignmentsUpdateFuts.add(forceAssignmentsUpdate(
-                        partitionEntry.getKey(),
-                        zoneDescriptor,
-                        dataNodes,
-                        nodeConsistentIds,
-                        msRevision,
-                        msTimestamp,
-                        disasterRecoveryManager.metaStorageManager,
-                        localStatesMap,
-                        catalog.time(),
-                        partitionIdsArray,
-                        request.manualUpdate()
-                ));
-            }
+                        assignmentsUpdateFuts.add(forceAssignmentsUpdate(
+                                partitionEntry.getKey(),
+                                zoneDescriptor,
+                                dataNodes,
+                                nodeConsistentIds,
+                                msRevision,
+                                msTimestamp,
+                                disasterRecoveryManager.metaStorageManager,
+                                localStatesMap,
+                                catalog.time(),
+                                partitionIdsArray,
+                                request.manualUpdate(),
+                                disasterRecoveryManager
+                        ));
+                    }
 
-            return allOf(assignmentsUpdateFuts.toArray(new 
CompletableFuture[]{}));
-        })
-        .thenCompose(Function.identity())
-        .whenComplete((unused, throwable) -> {
-            // TODO: IGNITE-23635 Add fail handling for failed resetPeers
-            if (throwable != null) {
-                LOG.error("Failed to reset partition", throwable);
-            }
+                    return allOf(assignmentsUpdateFuts.toArray(new 
CompletableFuture[]{}));
+                }).whenComplete((unused, throwable) -> {
+                    // TODO: IGNITE-23635 Add fail handling for failed 
resetPeers
+                    if (throwable != null) {
+                        LOG.error("Failed to reset partition", throwable);
+                    }
+                });
+            }).thenCompose(Function.identity());
         });
     }
 
@@ -188,31 +192,35 @@ abstract class GroupUpdateRequestHandler<T extends 
PartitionGroupId> {
             Map<T, LocalPartitionStateMessageByNode> localStatesMap,
             long assignmentsTimestamp,
             int[] partitionIds,
-            boolean manualUpdate
+            boolean manualUpdate,
+            DisasterRecoveryManager disasterRecoveryManager
     ) {
-        CompletableFuture<Map<Integer, Assignments>> stableAssignments =
-                stableAssignments(metaStorageManager, replicationId, 
partitionIds);
-        return stableAssignments
-                .thenCompose(assignments -> {
-                    if (assignments.isEmpty()) {
-                        return nullCompletedFuture();
-                    }
-
-                    return updateAssignments(
-                            replicationId,
-                            zoneDescriptor,
-                            dataNodes,
-                            aliveNodesConsistentIds,
-                            revision,
-                            timestamp,
-                            metaStorageManager,
-                            localStatesMap,
-                            assignmentsTimestamp,
-                            partitionIds,
-                            assignments,
-                            manualUpdate
-                    );
-                });
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            CompletableFuture<Map<Integer, Assignments>> stableAssignments =
+                    stableAssignments(metaStorageManager, replicationId, 
partitionIds);
+            return stableAssignments
+                    .thenCompose(assignments -> 
inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+                        if (assignments.isEmpty()) {
+                            return nullCompletedFuture();
+                        }
+
+                        return updateAssignments(
+                                replicationId,
+                                zoneDescriptor,
+                                dataNodes,
+                                aliveNodesConsistentIds,
+                                revision,
+                                timestamp,
+                                metaStorageManager,
+                                localStatesMap,
+                                assignmentsTimestamp,
+                                partitionIds,
+                                assignments,
+                                manualUpdate,
+                                disasterRecoveryManager
+                        );
+                    }));
+        });
     }
 
     private CompletableFuture<Void> updateAssignments(
@@ -227,40 +235,44 @@ abstract class GroupUpdateRequestHandler<T extends 
PartitionGroupId> {
             long assignmentsTimestamp,
             int[] partitionIds,
             Map<Integer, Assignments> stableAssignments,
-            boolean manualUpdate
+            boolean manualUpdate,
+            DisasterRecoveryManager disasterRecoveryManager
     ) {
-        Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
-
-        CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIds.length];
-
-        for (int i = 0; i < partitionIds.length; i++) {
-            T replicaGrpId = replicationGroupId(replicationId, 
partitionIds[i]);
-            LocalPartitionStateMessageByNode localStatesByNode = 
localStatesMap.containsKey(replicaGrpId)
-                    ? localStatesMap.get(replicaGrpId)
-                    : new LocalPartitionStateMessageByNode(emptyMap());
-
-            futures[i] = partitionUpdate(
-                    replicaGrpId,
-                    aliveDataNodes,
-                    aliveNodesConsistentIds,
-                    zoneDescriptor.partitions(),
-                    zoneDescriptor.replicas(),
-                    zoneDescriptor.consensusGroupSize(),
-                    revision,
-                    timestamp,
-                    metaStorageManager,
-                    stableAssignments.get(replicaGrpId.partitionId()).nodes(),
-                    localStatesByNode,
-                    assignmentsTimestamp,
-                    manualUpdate
-            ).thenAccept(res -> {
-                DisasterRecoveryManager.LOG.info(
-                        "Partition {} returned {} status on reset attempt", 
replicaGrpId, UpdateStatus.valueOf(res)
-                );
-            });
-        }
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
+
+            CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIds.length];
+
+            for (int i = 0; i < partitionIds.length; i++) {
+                T replicaGrpId = replicationGroupId(replicationId, 
partitionIds[i]);
+                LocalPartitionStateMessageByNode localStatesByNode = 
localStatesMap.containsKey(replicaGrpId)
+                        ? localStatesMap.get(replicaGrpId)
+                        : new LocalPartitionStateMessageByNode(emptyMap());
+
+                futures[i] = partitionUpdate(
+                        replicaGrpId,
+                        aliveDataNodes,
+                        aliveNodesConsistentIds,
+                        zoneDescriptor.partitions(),
+                        zoneDescriptor.replicas(),
+                        zoneDescriptor.consensusGroupSize(),
+                        revision,
+                        timestamp,
+                        metaStorageManager,
+                        
stableAssignments.get(replicaGrpId.partitionId()).nodes(),
+                        localStatesByNode,
+                        assignmentsTimestamp,
+                        manualUpdate,
+                        disasterRecoveryManager
+                ).thenAccept(res -> {
+                    DisasterRecoveryManager.LOG.info(
+                            "Partition {} returned {} status on reset 
attempt", replicaGrpId, UpdateStatus.valueOf(res)
+                    );
+                });
+            }
 
-        return allOf(futures);
+            return allOf(futures);
+        });
     }
 
     private CompletableFuture<Integer> partitionUpdate(
@@ -276,51 +288,54 @@ abstract class GroupUpdateRequestHandler<T extends 
PartitionGroupId> {
             Set<Assignment> currentAssignments,
             LocalPartitionStateMessageByNode localPartitionStateMessageByNode,
             long assignmentsTimestamp,
-            boolean manualUpdate
+            boolean manualUpdate,
+            DisasterRecoveryManager disasterRecoveryManager
     ) {
-        Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
-        Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentAssignments, partAssignments);
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
+            Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentAssignments, partAssignments);
 
-        if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
-            return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
-        }
+            if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
+                return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+            }
 
-        if (aliveStableNodes.isEmpty() && !manualUpdate) {
-            return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
-        }
+            if (aliveStableNodes.isEmpty() && !manualUpdate) {
+                return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+            }
 
-        if (manualUpdate) {
-            enrichAssignments(partId, aliveDataNodes, partitions, replicas, 
consensusGroupSize, partAssignments);
-        }
+            if (manualUpdate) {
+                enrichAssignments(partId, aliveDataNodes, partitions, 
replicas, consensusGroupSize, partAssignments);
+            }
 
-        Assignment nextAssignment = 
nextAssignment(localPartitionStateMessageByNode, partAssignments);
+            Assignment nextAssignment = 
nextAssignment(localPartitionStateMessageByNode, partAssignments);
 
-        boolean isProposedPendingEqualsProposedPlanned = 
partAssignments.size() == 1;
+            boolean isProposedPendingEqualsProposedPlanned = 
partAssignments.size() == 1;
 
-        assert partAssignments.contains(nextAssignment) : 
IgniteStringFormatter.format(
-                "Recovery nodes set doesn't contain the reset node assignment 
[partAssignments={}, nextAssignment={}]",
-                partAssignments,
-                nextAssignment
-        );
+            assert partAssignments.contains(nextAssignment) : 
IgniteStringFormatter.format(
+                    "Recovery nodes set doesn't contain the reset node 
assignment [partAssignments={}, nextAssignment={}]",
+                    partAssignments,
+                    nextAssignment
+            );
 
-        // There are nodes with data, and we set pending assignments to this 
set of nodes. It'll be the source of peers for
-        // "resetPeers", and after that new assignments with restored replica 
factor wil be picked up from planned assignments
-        // for the case of the manual update, that was triggered by a user.
-        AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
-                .stable(Assignments.of(currentAssignments, 
assignmentsTimestamp))
-                .target(Assignments.forced(Set.of(nextAssignment), 
assignmentsTimestamp))
-                .toQueue();
-
-        return invoke(
-                partId,
-                revision,
-                timestamp,
-                metaStorageMgr,
-                assignmentsTimestamp,
-                assignmentsQueue,
-                isProposedPendingEqualsProposedPlanned,
-                partAssignments
-        );
+            // There are nodes with data, and we set pending assignments to 
this set of nodes. It'll be the source of peers for
+            // "resetPeers", and after that new assignments with restored 
replica factor wil be picked up from planned assignments
+            // for the case of the manual update, that was triggered by a user.
+            AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
+                    .stable(Assignments.of(currentAssignments, 
assignmentsTimestamp))
+                    .target(Assignments.forced(Set.of(nextAssignment), 
assignmentsTimestamp))
+                    .toQueue();
+
+            return invoke(
+                    partId,
+                    revision,
+                    timestamp,
+                    metaStorageMgr,
+                    assignmentsTimestamp,
+                    assignmentsQueue,
+                    isProposedPendingEqualsProposedPlanned,
+                    partAssignments
+            );
+        });
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 4d3277a15bc..8bc008ba83a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -26,7 +26,7 @@ import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType.MULTI_NODE;
 import static 
org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequestHandler.getAliveNodesWithData;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,7 +49,7 @@ import org.apache.ignite.internal.replicator.PartitionGroupId;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.CollectionUtils;
 
@@ -123,30 +123,33 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
 
     @Override
     public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
-        if (!nodeNames.isEmpty() && 
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
-            return nullCompletedFuture();
-        }
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            if (!nodeNames.isEmpty() && 
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
+                return nullCompletedFuture();
+            }
 
-        Catalog catalog = 
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
-        CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+            Catalog catalog = 
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
+            CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
 
-        var restartFutures = new ArrayList<CompletableFuture<?>>();
+            var restartFutures = new ArrayList<CompletableFuture<?>>();
 
-        disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
-            ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+            disasterRecoveryManager.raftManager.forEach((raftNodeId, 
raftGroupService) -> {
+                ReplicationGroupId replicationGroupId = raftNodeId.groupId();
 
-            if (shouldProcessPartition(replicationGroupId, zoneDescriptor)) {
-                if (cleanUp) {
-                    restartFutures.add(
-                            
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId, 
revision, zoneDescriptor, catalog)
-                    );
-                } else {
-                    
restartFutures.add(createRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision));
+                if (shouldProcessPartition(replicationGroupId, 
zoneDescriptor)) {
+                    if (cleanUp) {
+                        restartFutures.add(
+                                
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId, 
revision, zoneDescriptor,
+                                        catalog)
+                        );
+                    } else {
+                        
restartFutures.add(createRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision));
+                    }
                 }
-            }
-        });
+            });
 
-        return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+            return restartFutures.isEmpty() ? nullCompletedFuture() : 
allOf(restartFutures.toArray(CompletableFuture[]::new));
+        });
     }
 
     private boolean shouldProcessPartition(ReplicationGroupId 
replicationGroupId, CatalogZoneDescriptor zoneDescriptor) {
@@ -216,38 +219,37 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
             CatalogZoneDescriptor zoneDescriptor,
             Catalog catalog
     ) {
-        if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
-            if (zoneDescriptor.replicas() >= 2) {
-                return createCleanupRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision);
-            } else {
-                return notEnoughAliveNodes();
-            }
-        } else {
-            if (zoneDescriptor.replicas() <= 2) {
-                return notEnoughAliveNodes();
-            }
-
-            return enoughAliveNodesToRestartWithCleanUp(
-                    disasterRecoveryManager,
-                    revision,
-                    replicationGroupId,
-                    zoneDescriptor,
-                    catalog
-            ).thenCompose(enoughNodes -> {
-                if (enoughNodes) {
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            if (zoneDescriptor.consistencyMode() == 
ConsistencyMode.HIGH_AVAILABILITY) {
+                if (zoneDescriptor.replicas() >= 2) {
                     return createCleanupRestartFuture(disasterRecoveryManager, 
replicationGroupId, revision);
                 } else {
                     return notEnoughAliveNodes();
                 }
-            });
-        }
+            } else {
+                if (zoneDescriptor.replicas() <= 2) {
+                    return notEnoughAliveNodes();
+                }
+
+                return enoughAliveNodesToRestartWithCleanUp(
+                        disasterRecoveryManager,
+                        revision,
+                        replicationGroupId,
+                        zoneDescriptor,
+                        catalog
+                ).thenCompose(enoughNodes -> {
+                    if (enoughNodes) {
+                        return 
createCleanupRestartFuture(disasterRecoveryManager, replicationGroupId, 
revision);
+                    } else {
+                        return notEnoughAliveNodes();
+                    }
+                });
+            }
+        });
     }
 
     private static <U> CompletableFuture<U> notEnoughAliveNodes() {
-        return CompletableFuture.failedFuture(
-                new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not 
enough alive nodes "
-                        + "to perform reset with clean up.")
-        );
+        return CompletableFuture.failedFuture(new 
NotEnoughAliveNodesException());
     }
 
     private static CompletableFuture<Boolean> 
enoughAliveNodesToRestartWithCleanUp(
@@ -303,29 +305,33 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
             Function<LocalPartitionStateMessage, T> keyExtractor,
             CompletableFuture<Map<Integer, Assignments>> stableAssignments
     ) {
-        Set<String> aliveNodesConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
-                .stream()
-                .map(NodeWithAttributes::nodeName)
-                .collect(Collectors.toSet());
-
-        CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> 
localStatesFuture =
-                disasterRecoveryManager.localPartitionStatesInternal(
-                        Set.of(zoneDescriptor.name()),
-                        emptySet(),
-                        Set.of(partitionGroupId.partitionId()),
-                        catalog,
-                        keyExtractor
-                );
+        return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+            Set<String> aliveNodesConsistentIds = 
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
+                    .stream()
+                    .map(NodeWithAttributes::nodeName)
+                    .collect(Collectors.toSet());
+
+            CompletableFuture<Map<T, LocalPartitionStateMessageByNode>> 
localStatesFuture =
+                    disasterRecoveryManager.localPartitionStatesInternal(
+                            Set.of(zoneDescriptor.name()),
+                            emptySet(),
+                            Set.of(partitionGroupId.partitionId()),
+                            catalog,
+                            keyExtractor
+                    );
 
-        return localStatesFuture.thenCombine(stableAssignments, 
(localPartitionStatesMap, currentAssignments) -> {
-            LocalPartitionStateMessageByNode localPartitionStateMessageByNode 
= localPartitionStatesMap.get(partitionGroupId);
+            return localStatesFuture.thenCombine(stableAssignments, 
(localPartitionStatesMap, currentAssignments) -> {
+                return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+                    LocalPartitionStateMessageByNode 
localPartitionStateMessageByNode = 
localPartitionStatesMap.get(partitionGroupId);
 
-            Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
-            Set<Assignment> currentStableAssignments = 
currentAssignments.get(partitionGroupId.partitionId()).nodes();
+                    Set<Assignment> partAssignments = 
getAliveNodesWithData(aliveNodesConsistentIds, 
localPartitionStateMessageByNode);
+                    Set<Assignment> currentStableAssignments = 
currentAssignments.get(partitionGroupId.partitionId()).nodes();
 
-            Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentStableAssignments, partAssignments);
+                    Set<Assignment> aliveStableNodes = 
CollectionUtils.intersect(currentStableAssignments, partAssignments);
 
-            return aliveStableNodes.size() > (zoneDescriptor.replicas() / 2 + 
1);
+                    return aliveStableNodes.size() > 
(zoneDescriptor.replicas() / 2 + 1);
+                });
+            });
         });
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
new file mode 100644
index 00000000000..836d8ac0f4a
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.ILLEGAL_NODES_SET_ERR;
+
+/** Exception is thrown when zero or more than one node name is specified for 
the operation. */
+public class IllegalNodesException extends DisasterRecoveryException {
+    private static final long serialVersionUID = -7959192788912267028L;
+
+    /** Creates exception that nodes list is empty. */
+    public IllegalNodesException() {
+        super(ILLEGAL_NODES_SET_ERR, "Only one node name should be specified 
for the operation.");
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
new file mode 100644
index 00000000000..1a1064529fa
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.NOT_ENOUGH_ALIVE_NODES_ERR;
+
+/**
+ * Exception is thrown when there are not enough alive nodes to perform reset 
with clean up.
+ */
+public class NotEnoughAliveNodesException extends DisasterRecoveryException {
+    private static final long serialVersionUID = -8864260391821773729L;
+
+    public NotEnoughAliveNodesException() {
+        super(NOT_ENOUGH_ALIVE_NODES_ERR, "Not enough alive nodes to perform 
reset with clean up.");
+    }
+}


Reply via email to