This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f477c5550be IGNITE-27258 Wait for all nodes in MULTI_NODE disaster
recovery requests (#7156)
f477c5550be is described below
commit f477c5550be850a1dab0374900084a846f7e9795
Author: Phillippko <[email protected]>
AuthorDate: Fri Dec 26 13:00:02 2025 +0700
IGNITE-27258 Wait for all nodes in MULTI_NODE disaster recovery requests
(#7156)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../recovery/restart/RestartPartitionsCall.java | 3 +-
.../org/apache/ignite/internal/util/Constants.java | 3 +
.../network/PartitionReplicationMessageGroup.java | 10 +-
.../disaster/OperationCompletedMessage.java | 34 +++++
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../disaster/DisasterRecoveryManager.java | 163 ++++++++++++++++++---
.../disaster/ManualGroupRestartRequest.java | 30 +++-
.../ManualGroupRestartRequestSerializer.java | 16 +-
.../MultiNodeDisasterRecoveryRequest.java} | 25 ++--
.../distributed/disaster/MultiNodeOperations.java | 76 ++++++++++
.../disaster/exceptions/NodeLeftException.java} | 22 +--
.../exceptions/RemoteOperationException.java} | 26 ++--
.../DisasterRecoveryRequestSerializerTest.java | 30 +++-
.../disaster/ItDisasterRecoveryManagerTest.java | 5 +-
18 files changed, 375 insertions(+), 77 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d029282b65f..02a7d4737d4 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -750,6 +750,9 @@ public class ErrorGroups {
/** Error when forwarding disaster recovery request to another node
failed. */
public static final int REQUEST_FORWARD_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 7);
+
+ /** Error when multi node operation fails on any node. */
+ public static final int REMOTE_NODE_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 8);
}
/** Embedded API error group. */
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
index c45b98a5016..63ca9eb0ac1 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/recovery/restart/RestartPartitionsCall.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.cli.call.recovery.restart;
+import static
org.apache.ignite.internal.util.Constants.DISASTER_RECOVERY_TIMEOUT_MILLIS;
import static org.apache.ignite.internal.util.StringUtils.nullOrEmpty;
import jakarta.inject.Singleton;
@@ -40,7 +41,7 @@ public class RestartPartitionsCall implements
Call<RestartPartitionsCallInput, S
@Override
public DefaultCallOutput<String> execute(RestartPartitionsCallInput input)
{
- RecoveryApi client = new
RecoveryApi(clientFactory.getClient(input.clusterUrl()));
+ RecoveryApi client = new
RecoveryApi(clientFactory.getClient(input.clusterUrl()).setReadTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS));
try {
if (nullOrEmpty(input.tableName())) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
index 0467ecc52cb..7f385318e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
@@ -30,6 +30,9 @@ public final class Constants {
/** Bytes in giga-byte (IEC 80000-13). */
public static final int GiB = 1024 * MiB;
+ /** Disaster recovery operations timeout in milliseconds. */
+ public static final int DISASTER_RECOVERY_TIMEOUT_MILLIS = 30_000;
+
/** Stub. */
private Constants() {
// Noop.
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
index 51c154310c3..3ea3d7b7b12 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java
@@ -33,12 +33,15 @@ import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCom
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateMinimumActiveTxBeginTimeCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommandV2;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryRequestMessage;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.DisasterRecoveryResponseMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesRequest;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPartitionStatesResponse;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.OperationCompletedMessage;
import
org.apache.ignite.internal.partition.replicator.network.message.HasDataRequest;
import
org.apache.ignite.internal.partition.replicator.network.message.HasDataResponse;
import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
@@ -307,10 +310,13 @@ public interface PartitionReplicationMessageGroup {
/** Message type for {@link LocalTablePartitionStateResponse}. */
short LOCAL_TABLE_PARTITION_STATE_RESPONSE = 105;
- /** Message type for disaster recovery request forwarding. */
+ /** Message type for {@link DisasterRecoveryRequestMessage}. */
short DISASTER_RECOVERY_REQUEST = 106;
- /** Message type for disaster recovery request forwarding response. */
+ /** Message type for {@link DisasterRecoveryResponseMessage}. */
short DISASTER_RECOVERY_RESPONSE = 111;
+
+ /** Message type for {@link OperationCompletedMessage}. */
+ short OPERATION_COMPLETED = 112;
}
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
new file mode 100644
index 00000000000..378d9b28921
--- /dev/null
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.jetbrains.annotations.Nullable;
+
+/** Sent to initiator of the multi node disaster recovery request after node
completed processing of the operation. */
+@Transferable(DisasterRecoveryMessages.OPERATION_COMPLETED)
+public interface OperationCompletedMessage extends NetworkMessage {
+ /** ID of the completed operation. */
+ UUID operationId();
+
+ /** Exception message if the operation failed. {@code null} if operation
completed successfully. */
+ @Nullable String exceptionMessage();
+}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 30a6a22de7b..232a6fbace7 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -228,6 +228,7 @@ enum class code : underlying_t {
NOT_ENOUGH_ALIVE_NODES = 0x140005,
ILLEGAL_NODES_SET = 0x140006,
REQUEST_FORWARD = 0x140007,
+ REMOTE_NODE = 0x140008,
// Embedded group. Group code: 21
CLUSTER_NOT_INITIALIZED = 0x150001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index f76ee1e4a74..d2beb8349d4 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -314,6 +314,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::NOT_ENOUGH_ALIVE_NODES:
case error::code::ILLEGAL_NODES_SET:
case error::code::REQUEST_FORWARD:
+ case error::code::REMOTE_NODE:
return sql_state::SHY000_GENERAL_ERROR;
// Embedded group. Group code: 21
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index a8b52b90ad9..ced201737df 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -729,6 +729,9 @@ namespace Apache.Ignite
/// <summary> RequestForward error. </summary>
public const int RequestForward = (GroupCode << 16) | (7 & 0xFFFF);
+
+ /// <summary> RemoteNode error. </summary>
+ public const int RemoteNode = (GroupCode << 16) | (8 & 0xFFFF);
}
/// <summary> Embedded errors. </summary>
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 1898f1281fd..768623cbdac 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1171,6 +1171,7 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
raftMgr,
clusterSvc.topologyService(),
+ logicalTopologyService,
distributedTblMgr,
metricManager,
failureManager,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index ec91f4b80f4..eef12c526e4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -21,6 +21,7 @@ import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
@@ -45,6 +46,7 @@ import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.Constants.DISASTER_RECOVERY_TIMEOUT_MILLIS;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
@@ -61,7 +63,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
@@ -72,6 +73,10 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesUtil;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
@@ -93,6 +98,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -111,6 +117,7 @@ import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalPar
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateMessage;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateRequest;
import
org.apache.ignite.internal.partition.replicator.network.disaster.LocalTablePartitionStateResponse;
+import
org.apache.ignite.internal.partition.replicator.network.disaster.OperationCompletedMessage;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.raft.Loza;
@@ -130,6 +137,7 @@ import
org.apache.ignite.internal.table.distributed.disaster.exceptions.Disaster
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryRequestForwardException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalNodesException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodeLeftException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import org.apache.ignite.internal.table.distributed.storage.NullMvTableStorage;
@@ -168,9 +176,6 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
- /** Disaster recovery operations timeout in seconds. */
- private static final int TIMEOUT_SECONDS = 30;
-
/**
* Maximal allowed difference between committed index on the leader and on
the follower, that differentiates
* {@link LocalPartitionStateEnum#HEALTHY} from {@link
LocalPartitionStateEnum#CATCHING_UP}.
@@ -201,9 +206,13 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
/** Cluster physical topology service. */
private final TopologyService topologyService;
+ private final LogicalTopologyService logicalTopology;
+
/** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */
private final WatchListener watchListener;
+ private final LogicalTopologyEventListener nodeLeftListener;
+
/** Table manager. */
final TableManager tableManager;
@@ -226,6 +235,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
private final Map<Integer, PartitionStatesMetricSource>
metricSourceByTableId = new ConcurrentHashMap<>();
+ private final Map<UUID, MultiNodeOperations> operationsByNodeId = new
ConcurrentHashMap<>();
+
/** Constructor. */
public DisasterRecoveryManager(
ExecutorService threadPool,
@@ -235,6 +246,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
DistributionZoneManager dzManager,
Loza raftManager,
TopologyService topologyService,
+ LogicalTopologyService logicalTopology,
TableManager tableManager,
MetricManager metricManager,
FailureManager failureManager,
@@ -248,6 +260,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
this.dzManager = dzManager;
this.raftManager = raftManager;
this.topologyService = topologyService;
+ this.logicalTopology = logicalTopology;
this.tableManager = tableManager;
this.metricManager = metricManager;
this.failureManager = failureManager;
@@ -260,6 +273,19 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
// There is no need to block a watch thread any longer.
return nullCompletedFuture();
});
+
+ nodeLeftListener = new LogicalTopologyEventListener() {
+ @Override
+ public void onNodeLeft(LogicalNode leftNode,
LogicalTopologySnapshot newTopology) {
+ operationsByNodeId.compute(leftNode.id(), (node, operations)
-> {
+ if (operations != null) {
+ operations.completeAllExceptionally(leftNode.name(),
new NodeLeftException(leftNode.name(), leftNode.id()));
+ }
+
+ return null;
+ });
+ }
+ };
}
@Override
@@ -279,6 +305,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
registerMetricSources();
+ logicalTopology.addEventListener(nodeLeftListener);
+
return nullCompletedFuture();
});
}
@@ -289,6 +317,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
metaStorageManager.unregisterWatch(watchListener);
+ logicalTopology.removeEventListener(nodeLeftListener);
+
for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
future.completeExceptionally(new NodeStoppingException());
}
@@ -533,7 +563,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
partitionIds,
nodeNames,
catalog.time(),
- false
+ false,
+ localNode().name()
));
} catch (Throwable t) {
return failedFuture(t);
@@ -580,7 +611,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
partitionIds,
nodeNames,
catalog.time(),
- true
+ true,
+ localNode().name()
));
} catch (Throwable t) {
return failedFuture(t);
@@ -622,7 +654,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
partitionIds,
nodeNames,
catalog.time(),
- false
+ false,
+ localNode().name()
));
} catch (Throwable t) {
return failedFuture(t);
@@ -666,7 +699,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
partitionIds,
nodeNames,
catalog.time(),
- true
+ true,
+ localNode().name()
));
} catch (Throwable t) {
return failedFuture(t);
@@ -769,7 +803,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
CompletableFuture<NetworkMessage> invokeFuture =
messagingService.invoke(
node.nodeName(),
localPartitionStatesRequest,
- TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
+ DISASTER_RECOVERY_TIMEOUT_MILLIS
);
futures[i++] = invokeFuture.thenAccept(networkMessage -> {
@@ -968,7 +1002,7 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
.catalogVersion(catalogVersion)
.build();
- return messagingService.invoke(node, request,
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+ return messagingService.invoke(node, request,
DISASTER_RECOVERY_TIMEOUT_MILLIS)
.thenApply(networkMessage -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Got response from node [nodeName={},
networkMessage={}]", node, networkMessage);
@@ -1108,10 +1142,10 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
*/
private CompletableFuture<Void> processNewRequest(DisasterRecoveryRequest
request, long revision) {
// Check if this is a manual restart request with specific nodes, and
forward to the first node if needed.
- if (request instanceof ManualGroupRestartRequest) {
- ManualGroupRestartRequest restartRequest =
(ManualGroupRestartRequest) request;
+ if (request instanceof MultiNodeDisasterRecoveryRequest) {
+ MultiNodeDisasterRecoveryRequest multiNodeRequest =
(MultiNodeDisasterRecoveryRequest) request;
- Set<String> nodeNames = restartRequest.nodeNames();
+ Set<String> nodeNames = multiNodeRequest.nodeNames();
if (!nodeNames.isEmpty()) {
String firstNode = nodeNames.stream()
@@ -1123,14 +1157,16 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
// If this is not the target node, forward the request and
return its response.
if (!firstNode.equals(localNodeName)) {
- return forwardDisasterRecoveryRequest(request, revision,
firstNode);
+ return forwardDisasterRecoveryRequest(multiNodeRequest,
revision, firstNode);
}
}
}
UUID operationId = request.operationId();
- CompletableFuture<Void> operationFuture = new
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ CompletableFuture<Void> operationFuture = new
CompletableFuture<Void>().orTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS,
MILLISECONDS);
+
+ CompletableFuture<Void> remoteProcessingFuture =
remoteProcessingFuture(request);
operationFuture.whenComplete((v, throwable) ->
ongoingOperationsById.remove(operationId));
@@ -1151,7 +1187,69 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
metaStorageManager.put(RECOVERY_TRIGGER_KEY, serializedRequest);
}
- return operationFuture;
+ return operationFuture.thenCompose(v -> remoteProcessingFuture);
+ }
+
+ private CompletableFuture<Void>
remoteProcessingFuture(DisasterRecoveryRequest request) {
+ if (request.type() != DisasterRecoveryRequestType.MULTI_NODE) {
+ return nullCompletedFuture();
+ }
+
+ UUID operationId = request.operationId();
+
+ MultiNodeDisasterRecoveryRequest multiNodeRequest =
(MultiNodeDisasterRecoveryRequest) request;
+
+ Set<NodeWithAttributes> nodes =
getRequestNodes(multiNodeRequest.nodeNames());
+
+ CompletableFuture<?>[] remoteProcessingFutures = nodes
+ .stream()
+ .map(node -> addMultiNodeOperation(node, operationId))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(remoteProcessingFutures);
+ }
+
+ /** If request node names is empty, returns all nodes in the logical
topology. */
+ private Set<NodeWithAttributes> getRequestNodes(Set<String>
requestNodeNames) {
+ return dzManager.logicalTopology().stream()
+ .filter(node -> requestNodeNames.isEmpty() ||
requestNodeNames.contains(node.nodeName()))
+ .collect(toSet());
+ }
+
+ private CompletableFuture<Void> addMultiNodeOperation(NodeWithAttributes
node, UUID operationId) {
+ CompletableFuture<Void> result = new
CompletableFuture<Void>().orTimeout(DISASTER_RECOVERY_TIMEOUT_MILLIS,
MILLISECONDS);
+
+ operationsByNodeId.compute(node.nodeId(), (nodeId, operations) -> {
+ Set<UUID> nodes = dzManager.logicalTopology().stream()
+ .map(NodeWithAttributes::nodeId)
+ .collect(toSet());
+
+ if (!nodes.contains(nodeId)) {
+ result.completeExceptionally(new
NodeLeftException(node.nodeName(), nodeId));
+
+ return operations;
+ }
+
+ if (operations == null) {
+ operations = new MultiNodeOperations();
+ }
+
+ operations.add(operationId, result);
+
+ return operations;
+ });
+
+ // Cleanup operation on completion.
+ return result.whenComplete((v, e) ->
+ operationsByNodeId.compute(node.nodeId(), (nodeId, operations)
-> {
+ if (operations != null) {
+ operations.remove(operationId);
+
+ return operations.isEmpty() ? null : operations;
+ }
+
+ return null;
+ }));
}
/**
@@ -1194,18 +1292,20 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
* @return Future that completes when the forwarded request is processed.
*/
private CompletableFuture<Void> forwardDisasterRecoveryRequest(
- DisasterRecoveryRequest request,
+ MultiNodeDisasterRecoveryRequest request,
long revision,
String targetNodeName
) {
- byte[] serializedRequest = VersionedSerialization.toBytes(request,
DisasterRecoveryRequestSerializer.INSTANCE);
+ DisasterRecoveryRequest updatedCoordinator =
request.updateCoordinator(targetNodeName);
+
+ byte[] serializedRequest =
VersionedSerialization.toBytes(updatedCoordinator,
DisasterRecoveryRequestSerializer.INSTANCE);
DisasterRecoveryRequestMessage message =
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryRequestMessage()
.requestBytes(serializedRequest)
.revision(revision)
.build();
- return messagingService.invoke(targetNodeName, message,
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+ return messagingService.invoke(targetNodeName, message,
DISASTER_RECOVERY_TIMEOUT_MILLIS)
.thenApply(responseMsg -> {
assert responseMsg instanceof
DisasterRecoveryResponseMessage : responseMsg;
@@ -1272,6 +1372,19 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
copyStateTo(operationFuture).accept(res, ex);
}
+ MultiNodeDisasterRecoveryRequest multiNodeRequest
= (MultiNodeDisasterRecoveryRequest) request;
+
+ if (multiNodeRequest.coordinator() != null) {
+ messagingService.send(
+ multiNodeRequest.coordinator(),
+ ChannelType.DEFAULT,
+
PARTITION_REPLICATION_MESSAGES_FACTORY.operationCompletedMessage()
+
.operationId(request.operationId())
+ .exceptionMessage(ex == null ?
null : ex.getMessage())
+ .build()
+ );
+ }
+
if (ex != null) {
if (!hasCause(
ex,
@@ -1311,6 +1424,8 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
handleLocalTableStateRequest((LocalTablePartitionStateRequest)
message, sender, correlationId);
} else if (message instanceof DisasterRecoveryRequestMessage) {
handleDisasterRecoveryRequest((DisasterRecoveryRequestMessage)
message, sender, correlationId);
+ } else if (message instanceof OperationCompletedMessage) {
+ handleOperationCompletedMessage((OperationCompletedMessage)
message, sender);
}
}
@@ -1446,6 +1561,16 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
}, threadPool);
}
+ private void handleOperationCompletedMessage(
+ OperationCompletedMessage message,
+ InternalClusterNode sender
+ ) {
+ MultiNodeOperations multiNodeOperations =
operationsByNodeId.get(sender.id());
+ if (multiNodeOperations != null) {
+ multiNodeOperations.complete(message.operationId(), sender.name(),
message.exceptionMessage());
+ }
+ }
+
private @Nullable LocalTablePartitionStateMessage
handleSizeRequestForTablesInZone(
Set<ZonePartitionId> requestedPartitions,
ZonePartitionId zonePartitionId
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 0396572b4e2..5294e4506f8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -50,8 +50,9 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.jetbrains.annotations.Nullable;
-class ManualGroupRestartRequest implements DisasterRecoveryRequest {
+class ManualGroupRestartRequest implements MultiNodeDisasterRecoveryRequest {
private final UUID operationId;
private final int zoneId;
@@ -66,6 +67,9 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
private final boolean cleanUp;
+ // Nullable for requests created before coordinator field introduction.
+ private final @Nullable String coordinator;
+
ManualGroupRestartRequest(
UUID operationId,
int zoneId,
@@ -73,7 +77,8 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
Set<Integer> partitionIds,
Set<String> nodeNames,
long assignmentsTimestamp,
- boolean cleanUp
+ boolean cleanUp,
+ @Nullable String coordinator
) {
this.operationId = operationId;
this.zoneId = zoneId;
@@ -82,6 +87,7 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
this.nodeNames = Set.copyOf(nodeNames);
this.assignmentsTimestamp = assignmentsTimestamp;
this.cleanUp = cleanUp;
+ this.coordinator = coordinator;
}
@Override
@@ -107,6 +113,7 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
return partitionIds;
}
+ @Override
public Set<String> nodeNames() {
return nodeNames;
}
@@ -119,6 +126,25 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
return cleanUp;
}
+ @Override
+ public @Nullable String coordinator() {
+ return coordinator;
+ }
+
+ @Override
+ public MultiNodeDisasterRecoveryRequest updateCoordinator(String
newCoordinatorName) {
+ return new ManualGroupRestartRequest(
+ operationId,
+ zoneId,
+ tableId,
+ partitionIds,
+ nodeNames,
+ assignmentsTimestamp,
+ cleanUp,
+ newCoordinatorName
+ );
+ }
+
@Override
public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
index 2cd851c6af4..32159225683 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequestSerializer.java
@@ -20,12 +20,14 @@ package
org.apache.ignite.internal.table.distributed.disaster;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import java.io.IOException;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.versioned.VersionedSerializer;
+import org.jetbrains.annotations.Nullable;
/**
* {@link VersionedSerializer} for {@link ManualGroupRestartRequest} instances.
@@ -36,11 +38,13 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
@Override
protected byte getProtocolVersion() {
- return 2;
+ return 3;
}
@Override
protected void writeExternalData(ManualGroupRestartRequest request,
IgniteDataOutput out) throws IOException {
+ Objects.requireNonNull(request.coordinator(), "Coordinator must not be
null");
+
out.writeUuid(request.operationId());
out.writeVarInt(request.zoneId());
out.writeVarInt(request.tableId());
@@ -48,6 +52,7 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
writeStringSet(request.nodeNames(), out);
hybridTimestamp(request.assignmentsTimestamp()).writeTo(out);
out.writeBoolean(request.cleanUp()); // Write the new 'cleanUp' field
introduced in protocol version 2.
+ out.writeUTF(request.coordinator()); // Write the new 'coordinator'
field introduced in protocol version 3.
}
@Override
@@ -65,6 +70,8 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
cleanUp = in.readBoolean(); // Read the new 'cleanUp' field if
protocol version is 2 or greater.
}
+ String coordinator = readCoordinator(protoVer, in);
+
return new ManualGroupRestartRequest(
operationId,
zoneId,
@@ -72,7 +79,12 @@ class ManualGroupRestartRequestSerializer extends
VersionedSerializer<ManualGrou
partitionIds,
nodeNames,
assignmentsTimestamp.longValue(),
- cleanUp
+ cleanUp,
+ coordinator
);
}
+
+ private static @Nullable String readCoordinator(byte protoVer,
IgniteDataInput in) throws IOException {
+ return protoVer >= 3 ? in.readUTF() : null;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
similarity index 54%
copy from
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
index 0467ecc52cb..bc46836de83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeDisasterRecoveryRequest.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster;
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
- /** Bytes in kilo-byte (IEC 80000-13). */
- public static final int KiB = 1024;
+import java.util.Set;
- /** Bytes in mega-byte (IEC 80000-13). */
- public static final int MiB = 1024 * KiB;
+/** {@link DisasterRecoveryRequest} that should be handled by multiple nodes.
*/
+public interface MultiNodeDisasterRecoveryRequest extends
DisasterRecoveryRequest {
+ /** Returns names of nodes involved in the recovery or empty set if all
nodes should be used. */
+ Set<String> nodeNames();
- /** Bytes in giga-byte (IEC 80000-13). */
- public static final int GiB = 1024 * MiB;
+ /** Returns the name of the node that initiated the request. {@code null}
for requests created before field introduction. */
+ String coordinator();
- /** Stub. */
- private Constants() {
- // Noop.
- }
+ /** Returns a new request with updated coordinator name. */
+ MultiNodeDisasterRecoveryRequest updateCoordinator(String
newCoordinatorName);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
new file mode 100644
index 00000000000..d4d493b2efb
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.RemoteOperationException;
+import org.jetbrains.annotations.Nullable;
+
+/** Contains operations that should be processed by remote node. */
+class MultiNodeOperations {
+ private final Map<UUID, CompletableFuture<Void>> operationsById = new
ConcurrentHashMap<>();
+
+ /** Adds new operation to track. */
+ void add(UUID operationId, CompletableFuture<Void> operationFuture) {
+ operationsById.put(operationId, operationFuture);
+ }
+
+ /**
+ * Removes operation tracking.
+ *
+ * @return Removed operation future.
+ */
+ CompletableFuture<Void> remove(UUID operationId) {
+ return operationsById.remove(operationId);
+ }
+
+ /** Completes all tracked operations with a given exception. */
+ void completeAllExceptionally(String nodeName, Throwable e) {
+ Set<UUID> operationIds = Set.copyOf(operationsById.keySet());
+
+ for (UUID operationId : operationIds) {
+ CompletableFuture<Void> operation =
operationsById.remove(operationId);
+
+ if (operation != null) {
+ operation.completeExceptionally(new
RemoteOperationException(e.getMessage(), nodeName));
+ }
+ }
+ }
+
+ /** Completes operation successfully or with {@link
RemoteOperationException} using provided nodeName and exception message. */
+ public void complete(UUID operationId, String nodeName, @Nullable String
exceptionMessage) {
+ CompletableFuture<Void> operationFuture =
operationsById.remove(operationId);
+
+ if (operationFuture != null) {
+ if (exceptionMessage != null) {
+ operationFuture.completeExceptionally(new
RemoteOperationException(exceptionMessage, nodeName));
+ } else {
+ operationFuture.complete(null);
+ }
+ }
+ }
+
+ /** If there are no ongoing operations. */
+ public boolean isEmpty() {
+ return operationsById.isEmpty();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
similarity index 58%
copy from
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
index 0467ecc52cb..b84f575ce45 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NodeLeftException.java
@@ -15,23 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
- /** Bytes in kilo-byte (IEC 80000-13). */
- public static final int KiB = 1024;
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.REMOTE_NODE_ERR;
- /** Bytes in mega-byte (IEC 80000-13). */
- public static final int MiB = 1024 * KiB;
+import java.util.UUID;
- /** Bytes in giga-byte (IEC 80000-13). */
- public static final int GiB = 1024 * MiB;
+/** Exception thrown when node left before finishing multi node recovery
request. */
+public class NodeLeftException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -6295004626426857229L;
- /** Stub. */
- private Constants() {
- // Noop.
+ public NodeLeftException(String nodeName, UUID nodeId) {
+ super(REMOTE_NODE_ERR, "Node left logical topology [name=" + nodeName
+ ", id=" + nodeId + "]");
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
similarity index 56%
copy from
modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
index 0467ecc52cb..db3a5f592e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/Constants.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java
@@ -15,23 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.util;
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
-/**
- * Utility class with magic constants.
- */
-public final class Constants {
- /** Bytes in kilo-byte (IEC 80000-13). */
- public static final int KiB = 1024;
-
- /** Bytes in mega-byte (IEC 80000-13). */
- public static final int MiB = 1024 * KiB;
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
- /** Bytes in giga-byte (IEC 80000-13). */
- public static final int GiB = 1024 * MiB;
+/** Exception thrown when remote node encounters an error while executing a
disaster recovery operation. */
+public class RemoteOperationException extends DisasterRecoveryException {
+ private static final long serialVersionUID = 1L;
- /** Stub. */
- private Constants() {
- // Noop.
+ /** Constructor. */
+ public RemoteOperationException(String message, String nodeName) {
+ super(
+ DisasterRecovery.REMOTE_NODE_ERR,
+ "Processing error on node " + nodeName + " during disaster
recovery: " + message
+ );
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
index 510c410b12a..0d5100ff6d5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
@@ -33,6 +33,8 @@ class DisasterRecoveryRequestSerializerTest {
private static final String GROUP_UPDATE_REQUEST_V2_BASE64 =
"Ae++QwEC775D782rkHhWNBIhQ2WHCbrc/ukH0Q8DuRcEIBYMoR8EIBcqAQE=";
private static final String MANUAL_GROUP_RESTART_REQUEST_V1_BASE64 =
"Ae++QwIB775D782rkHhWNBIhQ2WHCbrc/tEPuRcEIBYMAwJiAmH///9///+AgAQ=";
private static final String MANUAL_GROUP_RESTART_REQUEST_V2_BASE64 =
"Ae++QwIC775D782rkHhWNBIhQ2WHCbrc/tEPuRcEFiAMAwJhAmL///9///+AgAQB";
+ private static final String MANUAL_GROUP_RESTART_REQUEST_V3_BASE64 =
"Ae++QwID775D782rkHhWNBIhQ2WHCbrc/tEPuRcEDBYgAwJhAmL///9///+"
+ + "AgAQACW5vZGVOYW1l";
private final DisasterRecoveryRequestSerializer serializer = new
DisasterRecoveryRequestSerializer();
@@ -107,7 +109,8 @@ class DisasterRecoveryRequestSerializerTest {
Set.of(11, 21, 31),
Set.of("a", "b"),
HybridTimestamp.MAX_VALUE.longValue(),
- false
+ false,
+ "nodeName"
);
byte[] bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
@@ -119,6 +122,7 @@ class DisasterRecoveryRequestSerializerTest {
assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
assertThat(restoredRequest.assignmentsTimestamp(),
is(HybridTimestamp.MAX_VALUE.longValue()));
+ assertThat(restoredRequest.coordinator(), is("nodeName"));
}
@Test
@@ -148,8 +152,23 @@ class DisasterRecoveryRequestSerializerTest {
assertThat(restoredRequest.cleanUp(), is(true));
}
+ @Test
+ void v3OfManualGroupRestartRequestCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode(MANUAL_GROUP_RESTART_REQUEST_V3_BASE64);
+ ManualGroupRestartRequest restoredRequest =
(ManualGroupRestartRequest) VersionedSerialization.fromBytes(bytes, serializer);
+
+ assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
+ assertThat(restoredRequest.zoneId(), is(2000));
+ assertThat(restoredRequest.tableId(), is(3000));
+ assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ assertThat(restoredRequest.nodeNames(), is(Set.of("a", "b")));
+ assertThat(restoredRequest.assignmentsTimestamp(),
is(HybridTimestamp.MAX_VALUE.longValue()));
+ assertThat(restoredRequest.cleanUp(), is(false));
+ assertThat(restoredRequest.coordinator(), is("nodeName"));
+ }
+
@SuppressWarnings("unused")
- private String manualGroupRestartRequestV1Base64() {
+ private String manualGroupRestartRequestV3Base64() {
var originalRequest = new ManualGroupRestartRequest(
new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
2000,
@@ -157,11 +176,12 @@ class DisasterRecoveryRequestSerializerTest {
Set.of(11, 21, 31),
Set.of("a", "b"),
HybridTimestamp.MAX_VALUE.longValue(),
- false
+ false,
+ "nodeName"
);
- byte[] v1Bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
- return Base64.getEncoder().encodeToString(v1Bytes);
+ byte[] v3Bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
+ return Base64.getEncoder().encodeToString(v3Bytes);
}
@SuppressWarnings("unused")
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index f81b2fc8582..f4d530fad07 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -32,6 +32,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -485,7 +486,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
- assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
+ assertThat(exception.getCause().getMessage(), containsString("Not
enough alive nodes to perform reset with clean up."));
}
@Test
@@ -519,7 +520,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
assertInstanceOf(DisasterRecoveryException.class,
exception.getCause());
- assertThat(exception.getCause().getMessage(), is("Not enough alive
nodes to perform reset with clean up."));
+ assertThat(exception.getCause().getMessage(), containsString("Not
enough alive nodes to perform reset with clean up."));
}
@ParameterizedTest(name = "consistencyMode={0}, primaryReplica={1},
raftLeader={2}")