This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f477c5550be IGNITE-27258 Wait for all nodes in MULTI_NODE disaster 
recovery requests (#7156)
f477c5550be is described below

commit f477c5550be850a1dab0374900084a846f7e9795
Author: Phillippko <[email protected]>
AuthorDate: Fri Dec 26 13:00:02 2025 +0700

    IGNITE-27258 Wait for all nodes in MULTI_NODE disaster recovery requests 
(#7156)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../recovery/restart/RestartPartitionsCall.java    |   3 +-
 .../org/apache/ignite/internal/util/Constants.java |   3 +
 .../network/PartitionReplicationMessageGroup.java  |  10 +-
 .../disaster/OperationCompletedMessage.java        |  34 +++++
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 .../disaster/DisasterRecoveryManager.java          | 163 ++++++++++++++++++---
 .../disaster/ManualGroupRestartRequest.java        |  30 +++-
 .../ManualGroupRestartRequestSerializer.java       |  16 +-
 .../MultiNodeDisasterRecoveryRequest.java}         |  25 ++--
 .../distributed/disaster/MultiNodeOperations.java  |  76 ++++++++++
 .../disaster/exceptions/NodeLeftException.java}    |  22 +--
 .../exceptions/RemoteOperationException.java}      |  26 ++--
 .../DisasterRecoveryRequestSerializerTest.java     |  30 +++-
 .../disaster/ItDisasterRecoveryManagerTest.java    |   5 +-
 18 files changed, 375 insertions(+), 77 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d029282b65f..02a7d4737d4 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -750,6 +750,9 @@ public class ErrorGroups {
 
         /** Error when forwarding disaster recovery request to another node 
failed. */
         public static final int REQUEST_FORWARD_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 7);
+
+        /** Error when multi node operation fails on any node. */
+        public static final int REMOTE_NODE_ERR = 
RECOVERY_ERR_GROUP.registerErrorCode((short) 8);
     }
 
     /** Embedded API error group. */
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
index c45b98a5016..63ca9eb0ac1 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.cli.call.recovery.restart;
 
+import static 
org.apache.ignite.internal.util.Constants.DISASTER_RECOVERY_TIMEOUT_MILLIS;
 import static org.apache.ignite.internal.util.StringUtils.nullOrEmpty;
 
 import jakarta.inject.Singleton;
@@ -40,7 +41,7 @@ public class RestartPartitionsCall implements 
Call<RestartPartitionsCallInput, S
 
     @Override
     public DefaultCallOutput<String> execute(RestartPartitionsCallInput input) 
{
-        RecoveryApi client = new 
RecoveryApi(clientFactory.getClient(input.clusterUrl()));
+        RecoveryApi client = new 
RecoveryApi(clientFactory.getClient(input.clusterUrl()).setReadTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS));
 
         try {
             if (nullOrEmpty(input.tableName())) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
index 0467ecc52cb..7f385318e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
@@ -30,6 +30,9 @@ public final class Constants {
     /** Bytes in giga-byte (IEC 80000-13). */
     public static final int GiB = 1024 * MiB;
 
+    /** Disaster recovery operations timeout in milliseconds. */
+    public static final int DISASTER_RECOVERY_TIMEOUT_MILLIS = 30_000;
+
     /** Stub. */
     private Constants() {
         // Noop.
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 51c154310c3..3ea3d7b7b12 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -33,12 +33,15 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCom
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryRequestMessage;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryResponseMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.OperationCompletedMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.message.HasDataRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
@@ -307,10 +310,13 @@ public interface PartitionReplicationMessageGroup {
         /** Message type for {@link LocalTablePartitionStateResponse}. */
         short LOCAL_TABLE_PARTITION_STATE_RESPONSE = 105;
 
-        /** Message type for disaster recovery request forwarding. */
+        /** Message type for {@link DisasterRecoveryRequestMessage}. */
         short DISASTER_RECOVERY_REQUEST = 106;
 
-        /** Message type for disaster recovery request forwarding response. */
+        /** Message type for {@link DisasterRecoveryResponseMessage}. */
         short DISASTER_RECOVERY_RESPONSE = 111;
+
+        /** Message type for {@link OperationCompletedMessage}. */
+        short OPERATION_COMPLETED = 112;
     }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
new file mode 100644
index 00000000000..378d9b28921
--- /dev/null
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.jetbrains.annotations.Nullable;
+
+/** Sent to initiator of the multi node disaster recovery request after node 
completed processing of the operation. */
+@Transferable(DisasterRecoveryMessages.OPERATION_COMPLETED)
+public interface OperationCompletedMessage extends NetworkMessage {
+    /** ID of the completed operation. */
+    UUID operationId();
+
+    /** Exception message if the operation failed. {@code null} if operation 
completed successfully. */
+    @Nullable String exceptionMessage();
+}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 30a6a22de7b..232a6fbace7 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -228,6 +228,7 @@ enum class code : underlying_t {
     NOT_ENOUGH_ALIVE_NODES = 0x140005,
     ILLEGAL_NODES_SET = 0x140006,
     REQUEST_FORWARD = 0x140007,
+    REMOTE_NODE = 0x140008,
 
     // Embedded group. Group code: 21
     CLUSTER_NOT_INITIALIZED = 0x150001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index f76ee1e4a74..d2beb8349d4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -314,6 +314,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::NOT_ENOUGH_ALIVE_NODES:
         case error::code::ILLEGAL_NODES_SET:
         case error::code::REQUEST_FORWARD:
+        case error::code::REMOTE_NODE:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Embedded group. Group code: 21
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index a8b52b90ad9..ced201737df 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -729,6 +729,9 @@ namespace Apache.Ignite
 
             /// <summary> RequestForward error. </summary>
             public const int RequestForward = (GroupCode << 16) | (7 & 0xFFFF);
+
+            /// <summary> RemoteNode error. </summary>
+            public const int RemoteNode = (GroupCode << 16) | (8 & 0xFFFF);
         }
 
         /// <summary> Embedded errors. </summary>
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1898f1281fd..768623cbdac 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1171,6 +1171,7 @@ public class IgniteImpl implements Ignite {
                 distributionZoneManager,
                 raftMgr,
                 clusterSvc.topologyService(),
+                logicalTopologyService,
                 distributedTblMgr,
                 metricManager,
                 failureManager,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index ec91f4b80f4..eef12c526e4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -21,6 +21,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toSet;
@@ -45,6 +46,7 @@ import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.Constants.DISASTER_RECOVERY_TIMEOUT_MILLIS;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
@@ -61,7 +63,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
@@ -72,6 +73,10 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
 import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
@@ -93,6 +98,7 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
@@ -111,6 +117,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
 import 
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
+import 
org.apache.ignite.internal.partition.replicator.network.disaster.OperationCompletedMessage;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.raft.Loza;
@@ -130,6 +137,7 @@ import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.Disaster
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryRequestForwardException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalNodesException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodeLeftException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
 import org.apache.ignite.internal.table.distributed.storage.NullMvTableStorage;
@@ -168,9 +176,6 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
 
-    /** Disaster recovery operations timeout in seconds. */
-    private static final int TIMEOUT_SECONDS = 30;
-
     /**
      * Maximal allowed difference between committed index on the leader and on 
the follower, that differentiates
      * {@link LocalPartitionStateEnum#HEALTHY} from {@link 
LocalPartitionStateEnum#CATCHING_UP}.
@@ -201,9 +206,13 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
     /** Cluster physical topology service.  */
     private final TopologyService topologyService;
 
+    private final LogicalTopologyService logicalTopology;
+
     /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */
     private final WatchListener watchListener;
 
+    private final LogicalTopologyEventListener nodeLeftListener;
+
     /** Table manager. */
     final TableManager tableManager;
 
@@ -226,6 +235,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
     private final Map<Integer, PartitionStatesMetricSource> 
metricSourceByTableId = new ConcurrentHashMap<>();
 
+    private final Map<UUID, MultiNodeOperations> operationsByNodeId = new 
ConcurrentHashMap<>();
+
     /** Constructor. */
     public DisasterRecoveryManager(
             ExecutorService threadPool,
@@ -235,6 +246,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             DistributionZoneManager dzManager,
             Loza raftManager,
             TopologyService topologyService,
+            LogicalTopologyService logicalTopology,
             TableManager tableManager,
             MetricManager metricManager,
             FailureManager failureManager,
@@ -248,6 +260,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         this.dzManager = dzManager;
         this.raftManager = raftManager;
         this.topologyService = topologyService;
+        this.logicalTopology = logicalTopology;
         this.tableManager = tableManager;
         this.metricManager = metricManager;
         this.failureManager = failureManager;
@@ -260,6 +273,19 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             // There is no need to block a watch thread any longer.
             return nullCompletedFuture();
         });
+
+        nodeLeftListener = new LogicalTopologyEventListener() {
+            @Override
+            public void onNodeLeft(LogicalNode leftNode, 
LogicalTopologySnapshot newTopology) {
+                operationsByNodeId.compute(leftNode.id(), (node, operations) 
-> {
+                    if (operations != null) {
+                        operations.completeAllExceptionally(leftNode.name(), 
new NodeLeftException(leftNode.name(), leftNode.id()));
+                    }
+
+                    return null;
+                });
+            }
+        };
     }
 
     @Override
@@ -279,6 +305,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
             registerMetricSources();
 
+            logicalTopology.addEventListener(nodeLeftListener);
+
             return nullCompletedFuture();
         });
     }
@@ -289,6 +317,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
         metaStorageManager.unregisterWatch(watchListener);
 
+        logicalTopology.removeEventListener(nodeLeftListener);
+
         for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
             future.completeExceptionally(new NodeStoppingException());
         }
@@ -533,7 +563,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         partitionIds,
                         nodeNames,
                         catalog.time(),
-                        false
+                        false,
+                        localNode().name()
                 ));
             } catch (Throwable t) {
                 return failedFuture(t);
@@ -580,7 +611,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         partitionIds,
                         nodeNames,
                         catalog.time(),
-                        true
+                        true,
+                        localNode().name()
                 ));
             } catch (Throwable t) {
                 return failedFuture(t);
@@ -622,7 +654,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         partitionIds,
                         nodeNames,
                         catalog.time(),
-                        false
+                        false,
+                        localNode().name()
                 ));
             } catch (Throwable t) {
                 return failedFuture(t);
@@ -666,7 +699,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                         partitionIds,
                         nodeNames,
                         catalog.time(),
-                        true
+                        true,
+                        localNode().name()
                 ));
             } catch (Throwable t) {
                 return failedFuture(t);
@@ -769,7 +803,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                 CompletableFuture<NetworkMessage> invokeFuture = 
messagingService.invoke(
                         node.nodeName(),
                         localPartitionStatesRequest,
-                        TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
+                        DISASTER_RECOVERY_TIMEOUT_MILLIS
                 );
 
                 futures[i++] = invokeFuture.thenAccept(networkMessage -> {
@@ -968,7 +1002,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                 .catalogVersion(catalogVersion)
                 .build();
 
-        return messagingService.invoke(node, request, 
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+        return messagingService.invoke(node, request, 
DISASTER_RECOVERY_TIMEOUT_MILLIS)
                 .thenApply(networkMessage -> {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Got response from node [nodeName={}, 
networkMessage={}]", node, networkMessage);
@@ -1108,10 +1142,10 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      */
     private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest 
request, long revision) {
         // Check if this is a manual restart request with specific nodes, and 
forward to the first node if needed.
-        if (request instanceof ManualGroupRestartRequest) {
-            ManualGroupRestartRequest restartRequest = 
(ManualGroupRestartRequest) request;
+        if (request instanceof MultiNodeDisasterRecoveryRequest) {
+            MultiNodeDisasterRecoveryRequest multiNodeRequest = 
(MultiNodeDisasterRecoveryRequest) request;
 
-            Set<String> nodeNames = restartRequest.nodeNames();
+            Set<String> nodeNames = multiNodeRequest.nodeNames();
 
             if (!nodeNames.isEmpty()) {
                 String firstNode = nodeNames.stream()
@@ -1123,14 +1157,16 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
 
                 // If this is not the target node, forward the request and 
return its response.
                 if (!firstNode.equals(localNodeName)) {
-                    return forwardDisasterRecoveryRequest(request, revision, 
firstNode);
+                    return forwardDisasterRecoveryRequest(multiNodeRequest, 
revision, firstNode);
                 }
             }
         }
 
         UUID operationId = request.operationId();
 
-        CompletableFuture<Void> operationFuture = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+        CompletableFuture<Void> operationFuture = new 
CompletableFuture<Void>().orTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS, 
MILLISECONDS);
+
+        CompletableFuture<Void> remoteProcessingFuture = 
remoteProcessingFuture(request);
 
         operationFuture.whenComplete((v, throwable) -> 
ongoingOperationsById.remove(operationId));
 
@@ -1151,7 +1187,69 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
         }
 
-        return operationFuture;
+        return operationFuture.thenCompose(v -> remoteProcessingFuture);
+    }
+
+    private CompletableFuture<Void> 
remoteProcessingFuture(DisasterRecoveryRequest request) {
+        if (request.type() != DisasterRecoveryRequestType.MULTI_NODE) {
+            return nullCompletedFuture();
+        }
+
+        UUID operationId = request.operationId();
+
+        MultiNodeDisasterRecoveryRequest multiNodeRequest = 
(MultiNodeDisasterRecoveryRequest) request;
+
+        Set<NodeWithAttributes> nodes = 
getRequestNodes(multiNodeRequest.nodeNames());
+
+        CompletableFuture<?>[] remoteProcessingFutures = nodes
+                .stream()
+                .map(node -> addMultiNodeOperation(node, operationId))
+                .toArray(CompletableFuture[]::new);
+
+        return allOf(remoteProcessingFutures);
+    }
+
+    /** If request node names is empty, returns all nodes in the logical 
topology. */
+    private Set<NodeWithAttributes> getRequestNodes(Set<String> 
requestNodeNames) {
+        return dzManager.logicalTopology().stream()
+                .filter(node -> requestNodeNames.isEmpty() || 
requestNodeNames.contains(node.nodeName()))
+                .collect(toSet());
+    }
+
+    private CompletableFuture<Void> addMultiNodeOperation(NodeWithAttributes 
node, UUID operationId) {
+        CompletableFuture<Void> result = new 
CompletableFuture<Void>().orTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS, 
MILLISECONDS);
+
+        operationsByNodeId.compute(node.nodeId(), (nodeId, operations) -> {
+            Set<UUID> nodes = dzManager.logicalTopology().stream()
+                    .map(NodeWithAttributes::nodeId)
+                    .collect(toSet());
+
+            if (!nodes.contains(nodeId)) {
+                result.completeExceptionally(new 
NodeLeftException(node.nodeName(), nodeId));
+
+                return operations;
+            }
+
+            if (operations == null) {
+                operations = new MultiNodeOperations();
+            }
+
+            operations.add(operationId, result);
+
+            return operations;
+        });
+
+        // Cleanup operation on completion.
+        return result.whenComplete((v, e) ->
+                operationsByNodeId.compute(node.nodeId(), (nodeId, operations) 
-> {
+                    if (operations != null) {
+                        operations.remove(operationId);
+
+                        return operations.isEmpty() ? null : operations;
+                    }
+
+                    return null;
+                }));
     }
 
     /**
@@ -1194,18 +1292,20 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
      * @return Future that completes when the forwarded request is processed.
      */
     private CompletableFuture<Void> forwardDisasterRecoveryRequest(
-            DisasterRecoveryRequest request,
+            MultiNodeDisasterRecoveryRequest request,
             long revision,
             String targetNodeName
     ) {
-        byte[] serializedRequest = VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE);
+        DisasterRecoveryRequest updatedCoordinator = 
request.updateCoordinator(targetNodeName);
+
+        byte[] serializedRequest = 
VersionedSerialization.toBytes(updatedCoordinator, 
DisasterRecoveryRequestSerializer.INSTANCE);
 
         DisasterRecoveryRequestMessage message = 
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryRequestMessage()
                 .requestBytes(serializedRequest)
                 .revision(revision)
                 .build();
 
-        return messagingService.invoke(targetNodeName, message, 
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+        return messagingService.invoke(targetNodeName, message, 
DISASTER_RECOVERY_TIMEOUT_MILLIS)
                 .thenApply(responseMsg -> {
                     assert responseMsg instanceof 
DisasterRecoveryResponseMessage : responseMsg;
 
@@ -1272,6 +1372,19 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
                                 copyStateTo(operationFuture).accept(res, ex);
                             }
 
+                            MultiNodeDisasterRecoveryRequest multiNodeRequest 
= (MultiNodeDisasterRecoveryRequest) request;
+
+                            if (multiNodeRequest.coordinator() != null) {
+                                messagingService.send(
+                                        multiNodeRequest.coordinator(),
+                                        ChannelType.DEFAULT,
+                                        
PARTITION_REPLICATION_MESSAGES_FACTORY.operationCompletedMessage()
+                                                
.operationId(request.operationId())
+                                                .exceptionMessage(ex == null ? 
null : ex.getMessage())
+                                                .build()
+                                );
+                            }
+
                             if (ex != null) {
                                 if (!hasCause(
                                         ex,
@@ -1311,6 +1424,8 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
             handleLocalTableStateRequest((LocalTablePartitionStateRequest) 
message, sender, correlationId);
         } else if (message instanceof DisasterRecoveryRequestMessage) {
             handleDisasterRecoveryRequest((DisasterRecoveryRequestMessage) 
message, sender, correlationId);
+        } else if (message instanceof OperationCompletedMessage) {
+            handleOperationCompletedMessage((OperationCompletedMessage) 
message, sender);
         }
     }
 
@@ -1446,6 +1561,16 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         }, threadPool);
     }
 
+    private void handleOperationCompletedMessage(
+            OperationCompletedMessage message,
+            InternalClusterNode sender
+    ) {
+        MultiNodeOperations multiNodeOperations = 
operationsByNodeId.get(sender.id());
+        if (multiNodeOperations != null) {
+            multiNodeOperations.complete(message.operationId(), sender.name(), 
message.exceptionMessage());
+        }
+    }
+
     private @Nullable LocalTablePartitionStateMessage 
handleSizeRequestForTablesInZone(
             Set<ZonePartitionId> requestedPartitions,
             ZonePartitionId zonePartitionId
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 0396572b4e2..5294e4506f8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -50,8 +50,9 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.CollectionUtils;
+import org.jetbrains.annotations.Nullable;
 
-class ManualGroupRestartRequest implements DisasterRecoveryRequest {
+class ManualGroupRestartRequest implements MultiNodeDisasterRecoveryRequest {
     private final UUID operationId;
 
     private final int zoneId;
@@ -66,6 +67,9 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
 
     private final boolean cleanUp;
 
+    // Nullable for requests created before coordinator field introduction.
+    private final @Nullable String coordinator;
+
     ManualGroupRestartRequest(
             UUID operationId,
             int zoneId,
@@ -73,7 +77,8 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
             Set<Integer> partitionIds,
             Set<String> nodeNames,
             long assignmentsTimestamp,
-            boolean cleanUp
+            boolean cleanUp,
+            @Nullable String coordinator
     ) {
         this.operationId = operationId;
         this.zoneId = zoneId;
@@ -82,6 +87,7 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
         this.nodeNames = Set.copyOf(nodeNames);
         this.assignmentsTimestamp = assignmentsTimestamp;
         this.cleanUp = cleanUp;
+        this.coordinator = coordinator;
     }
 
     @Override
@@ -107,6 +113,7 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
         return partitionIds;
     }
 
+    @Override
     public Set<String> nodeNames() {
         return nodeNames;
     }
@@ -119,6 +126,25 @@ class ManualGroupRestartRequest implements 
DisasterRecoveryRequest {
         return cleanUp;
     }
 
+    @Override
+    public @Nullable String coordinator() {
+        return coordinator;
+    }
+
+    @Override
+    public MultiNodeDisasterRecoveryRequest updateCoordinator(String 
newCoordinatorName) {
+        return new ManualGroupRestartRequest(
+                operationId,
+                zoneId,
+                tableId,
+                partitionIds,
+                nodeNames,
+                assignmentsTimestamp,
+                cleanUp,
+                newCoordinatorName
+        );
+    }
+
     @Override
     public CompletableFuture<Void> handle(DisasterRecoveryManager 
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
         return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
index 2cd851c6af4..32159225683 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
@@ -20,12 +20,14 @@ package 
org.apache.ignite.internal.table.distributed.disaster;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.util.io.IgniteDataInput;
 import org.apache.ignite.internal.util.io.IgniteDataOutput;
 import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * {@link VersionedSerializer} for {@link ManualGroupRestartRequest} instances.
@@ -36,11 +38,13 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
 
     @Override
     protected byte getProtocolVersion() {
-        return 2;
+        return 3;
     }
 
     @Override
     protected void writeExternalData(ManualGroupRestartRequest request, 
IgniteDataOutput out) throws IOException {
+        Objects.requireNonNull(request.coordinator(), "Coordinator must not be 
null");
+
         out.writeUuid(request.operationId());
         out.writeVarInt(request.zoneId());
         out.writeVarInt(request.tableId());
@@ -48,6 +52,7 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
         writeStringSet(request.nodeNames(), out);
         hybridTimestamp(request.assignmentsTimestamp()).writeTo(out);
         out.writeBoolean(request.cleanUp()); // Write the new 'cleanUp' field 
introduced in protocol version 2.
+        out.writeUTF(request.coordinator()); // Write the new 'coordinator' 
field introduced in protocol version 3.
     }
 
     @Override
@@ -65,6 +70,8 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
             cleanUp = in.readBoolean(); // Read the new 'cleanUp' field if 
protocol version is 2 or greater.
         }
 
+        String coordinator = readCoordinator(protoVer, in);
+
         return new ManualGroupRestartRequest(
                 operationId,
                 zoneId,
@@ -72,7 +79,12 @@ class ManualGroupRestartRequestSerializer extends 
VersionedSerializer<ManualGrou
                 partitionIds,
                 nodeNames,
                 assignmentsTimestamp.longValue(),
-                cleanUp
+                cleanUp,
+                coordinator
         );
     }
+
+    private static @Nullable String readCoordinator(byte protoVer, 
IgniteDataInput in) throws IOException {
+        return protoVer >= 3 ? in.readUTF() : null;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
similarity index 54%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
index 0467ecc52cb..bc46836de83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster;
 
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
-    /** Bytes in kilo-byte  (IEC 80000-13). */
-    public static final int KiB = 1024;
+import java.util.Set;
 
-    /** Bytes in mega-byte (IEC 80000-13). */
-    public static final int MiB = 1024 * KiB;
+/** {@link DisasterRecoveryRequest} that should be handled by multiple nodes. 
*/
+public interface MultiNodeDisasterRecoveryRequest extends 
DisasterRecoveryRequest {
+    /** Returns names of nodes involved in the recovery or empty set if all 
nodes should be used. */
+    Set<String> nodeNames();
 
-    /** Bytes in giga-byte (IEC 80000-13). */
-    public static final int GiB = 1024 * MiB;
+    /** Returns the name of the node that initiated the request. {@code null} 
for requests created before field introduction. */
+    String coordinator();
 
-    /** Stub. */
-    private Constants() {
-        // Noop.
-    }
+    /** Returns a new request with updated coordinator name. */
+    MultiNodeDisasterRecoveryRequest updateCoordinator(String 
newCoordinatorName);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
new file mode 100644
index 00000000000..d4d493b2efb
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.RemoteOperationException;
+import org.jetbrains.annotations.Nullable;
+
+/** Contains operations that should be processed by remote node. */
+class MultiNodeOperations {
+    private final Map<UUID, CompletableFuture<Void>> operationsById = new 
ConcurrentHashMap<>();
+
+    /** Adds new operation to track. */
+    void add(UUID operationId, CompletableFuture<Void> operationFuture) {
+        operationsById.put(operationId, operationFuture);
+    }
+
+    /**
+     * Removes operation tracking.
+     *
+     * @return Removed operation future.
+     */
+    CompletableFuture<Void> remove(UUID operationId) {
+        return operationsById.remove(operationId);
+    }
+
+    /** Completes all tracked operations with a given exception. */
+    void completeAllExceptionally(String nodeName, Throwable e) {
+        Set<UUID> operationIds = Set.copyOf(operationsById.keySet());
+
+        for (UUID operationId : operationIds) {
+            CompletableFuture<Void> operation = 
operationsById.remove(operationId);
+
+            if (operation != null) {
+                operation.completeExceptionally(new 
RemoteOperationException(e.getMessage(), nodeName));
+            }
+        }
+    }
+
+    /** Completes operation successfully or with {@link 
RemoteOperationException} using provided nodeName and exception message. */
+    public void complete(UUID operationId, String nodeName, @Nullable String 
exceptionMessage) {
+        CompletableFuture<Void> operationFuture = 
operationsById.remove(operationId);
+
+        if (operationFuture != null) {
+            if (exceptionMessage != null) {
+                operationFuture.completeExceptionally(new 
RemoteOperationException(exceptionMessage, nodeName));
+            } else {
+                operationFuture.complete(null);
+            }
+        }
+    }
+
+    /** If there are no ongoing operations. */
+    public boolean isEmpty() {
+        return operationsById.isEmpty();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
similarity index 58%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
index 0467ecc52cb..b84f575ce45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
@@ -15,23 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
 
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
-    /** Bytes in kilo-byte  (IEC 80000-13). */
-    public static final int KiB = 1024;
+import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.REMOTE_NODE_ERR;
 
-    /** Bytes in mega-byte (IEC 80000-13). */
-    public static final int MiB = 1024 * KiB;
+import java.util.UUID;
 
-    /** Bytes in giga-byte (IEC 80000-13). */
-    public static final int GiB = 1024 * MiB;
+/** Exception thrown when node left before finishing multi node recovery 
request. */
+public class NodeLeftException extends DisasterRecoveryException {
+    private static final long serialVersionUID = -6295004626426857229L;
 
-    /** Stub. */
-    private Constants() {
-        // Noop.
+    public NodeLeftException(String nodeName, UUID nodeId) {
+        super(REMOTE_NODE_ERR, "Node left logical topology [name=" + nodeName 
+ ", id=" + nodeId + "]");
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
similarity index 56%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
index 0467ecc52cb..db3a5f592e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
@@ -15,23 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
 
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
-    /** Bytes in kilo-byte  (IEC 80000-13). */
-    public static final int KiB = 1024;
-
-    /** Bytes in mega-byte (IEC 80000-13). */
-    public static final int MiB = 1024 * KiB;
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
 
-    /** Bytes in giga-byte (IEC 80000-13). */
-    public static final int GiB = 1024 * MiB;
+/** Exception thrown when remote node encounters an error while executing a 
disaster recovery operation. */
+public class RemoteOperationException extends DisasterRecoveryException {
+    private static final long serialVersionUID = 1L;
 
-    /** Stub. */
-    private Constants() {
-        // Noop.
+    /** Constructor. */
+    public RemoteOperationException(String message, String nodeName) {
+        super(
+                DisasterRecovery.REMOTE_NODE_ERR,
+                "Processing error on node " + nodeName + " during disaster 
recovery: " + message
+        );
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
index 510c410b12a..0d5100ff6d5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
@@ -33,6 +33,8 @@ class DisasterRecoveryRequestSerializerTest {
     private static final String GROUP_UPDATE_REQUEST_V2_BASE64 = 
"Ae++QwEC775D782rkHhWNBIhQ2WHCbrc/ukH0Q8DuRcEIBYMoR8EIBcqAQE=";
     private static final String MANUAL_GROUP_RESTART_REQUEST_V1_BASE64 = 
"Ae++QwIB775D782rkHhWNBIhQ2WHCbrc/tEPuRcEIBYMAwJiAmH///9///+AgAQ=";
     private static final String MANUAL_GROUP_RESTART_REQUEST_V2_BASE64 = 
"Ae++QwIC775D782rkHhWNBIhQ2WHCbrc/tEPuRcEFiAMAwJhAmL///9///+AgAQB";
+    private static final String MANUAL_GROUP_RESTART_REQUEST_V3_BASE64 = 
"Ae++QwID775D782rkHhWNBIhQ2WHCbrc/tEPuRcEDBYgAwJhAmL///9///+"
+            + "AgAQACW5vZGVOYW1l";
 
     private final DisasterRecoveryRequestSerializer serializer = new 
DisasterRecoveryRequestSerializer();
 
@@ -107,7 +109,8 @@ class DisasterRecoveryRequestSerializerTest {
                 Set.of(11, 21, 31),
                 Set.of("a", "b"),
                 HybridTimestamp.MAX_VALUE.longValue(),
-                false
+                false,
+                "nodeName"
         );
 
         byte[] bytes = VersionedSerialization.toBytes(originalRequest, 
serializer);
@@ -119,6 +122,7 @@ class DisasterRecoveryRequestSerializerTest {
         assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
         assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
         assertThat(restoredRequest.assignmentsTimestamp(), 
is(HybridTimestamp.MAX_VALUE.longValue()));
+        assertThat(restoredRequest.coordinator(), is("nodeName"));
     }
 
     @Test
@@ -148,8 +152,23 @@ class DisasterRecoveryRequestSerializerTest {
         assertThat(restoredRequest.cleanUp(), is(true));
     }
 
+    @Test
+    void v3OfManualGroupRestartRequestCanBeDeserialized() {
+        byte[] bytes = 
Base64.getDecoder().decode(MANUAL_GROUP_RESTART_REQUEST_V3_BASE64);
+        ManualGroupRestartRequest restoredRequest = 
(ManualGroupRestartRequest) VersionedSerialization.fromBytes(bytes, serializer);
+
+        assertThat(restoredRequest.operationId(), is(new 
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+        assertThat(restoredRequest.zoneId(), is(2000));
+        assertThat(restoredRequest.tableId(), is(3000));
+        assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+        assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
+        assertThat(restoredRequest.assignmentsTimestamp(), 
is(HybridTimestamp.MAX_VALUE.longValue()));
+        assertThat(restoredRequest.cleanUp(), is(false));
+        assertThat(restoredRequest.coordinator(), is("nodeName"));
+    }
+
     @SuppressWarnings("unused")
-    private String manualGroupRestartRequestV1Base64() {
+    private String manualGroupRestartRequestV3Base64() {
         var originalRequest = new ManualGroupRestartRequest(
                 new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
                 2000,
@@ -157,11 +176,12 @@ class DisasterRecoveryRequestSerializerTest {
                 Set.of(11, 21, 31),
                 Set.of("a", "b"),
                 HybridTimestamp.MAX_VALUE.longValue(),
-                false
+                false,
+                "nodeName"
         );
 
-        byte[] v1Bytes = VersionedSerialization.toBytes(originalRequest, 
serializer);
-        return Base64.getEncoder().encodeToString(v1Bytes);
+        byte[] v3Bytes = VersionedSerialization.toBytes(originalRequest, 
serializer);
+        return Base64.getEncoder().encodeToString(v3Bytes);
     }
 
     @SuppressWarnings("unused")
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index f81b2fc8582..f4d530fad07 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -32,6 +32,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -485,7 +486,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
 
-        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
+        assertThat(exception.getCause().getMessage(), containsString("Not 
enough alive nodes to perform reset with clean up."));
     }
 
     @Test
@@ -519,7 +520,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
 
         assertInstanceOf(DisasterRecoveryException.class, 
exception.getCause());
 
-        assertThat(exception.getCause().getMessage(), is("Not enough alive 
nodes to perform reset with clean up."));
+        assertThat(exception.getCause().getMessage(), containsString("Not 
enough alive nodes to perform reset with clean up."));
     }
 
     @ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1}, 
raftLeader={2}")

Reply via email to