This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 52491546348 IGNITE-26337 Fix restart with cleanup tests (#6699)
52491546348 is described below
commit 52491546348d33721bf9239e9615ba7d07f386e1
Author: Mirza Aliev <[email protected]>
AuthorDate: Sat Oct 11 17:34:18 2025 +0400
IGNITE-26337 Fix restart with cleanup tests (#6699)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 7 +-
.../restart/ItRestartPartitionsTest.java | 27 +-
modules/platforms/cpp/ignite/common/error_codes.h | 3 +-
modules/platforms/cpp/ignite/odbc/common_types.cpp | 3 +-
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 7 +-
.../handler/IgniteInternalExceptionHandler.java | 3 +-
...ControllerRestartPartitionsWithCleanupTest.java | 80 ++-
.../ItHighAvailablePartitionsRecoveryTest.java | 1 +
.../disaster/DisasterRecoveryManager.java | 657 ++++++++++++---------
.../disaster/GroupUpdateRequestHandler.java | 291 ++++-----
.../disaster/ManualGroupRestartRequest.java | 134 +++--
.../disaster/exceptions/IllegalNodesException.java | 30 +
.../exceptions/NotEnoughAliveNodesException.java | 31 +
13 files changed, 738 insertions(+), 536 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index d11eed81505..3c897015f38 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -727,8 +727,11 @@ public class ErrorGroups {
/** Error while returning partition states. */
public static final int CLUSTER_NOT_IDLE_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 4);
- /** Error while restarting the cluster with clean up. */
- public static final int RESTART_WITH_CLEAN_UP_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 5);
+ /** Error when not enough alive nodes to perform restart with clean
up. */
+ public static final int NOT_ENOUGH_ALIVE_NODES_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 5);
+
+ /** Error when node names are not in valid set. */
+ public static final int ILLEGAL_NODES_SET_ERR =
RECOVERY_ERR_GROUP.registerErrorCode((short) 6);
}
/** Embedded API error group. */
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
index f795455d85d..5f0e0b32187 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/ItRestartPartitionsTest.java
@@ -46,11 +46,15 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
@BeforeAll
public void createTables() {
- sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles
['%s']", ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
+ sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles
['%s']",
+ ZONE,
+ initialNodes(),
+ DEFAULT_AIPERSIST_PROFILE_NAME
+ ));
+
sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val
INT) ZONE \"%s\"", TABLE_NAME, ZONE));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
@Test
public void testRestartAllPartitions() {
execute(CLUSTER_URL_OPTION, NODE_URL,
@@ -165,12 +169,15 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638")
public void testRestartAllPartitionsWithCleanup() {
+ String nodeName = CLUSTER.aliveNode().name();
+
execute(CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_TABLE_NAME_OPTION, QUALIFIED_TABLE_NAME,
RECOVERY_ZONE_NAME_OPTION, ZONE,
+ RECOVERY_NODE_NAMES_OPTION, nodeName,
RECOVERY_WITH_CLEANUP_OPTION
);
@@ -178,13 +185,16 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
assertOutputContains("Successfully restarted partitions.");
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638")
public void testRestartSpecifiedPartitionsWithCleanup() {
+ String nodeName = CLUSTER.aliveNode().name();
+
execute(CLUSTER_URL_OPTION, NODE_URL,
RECOVERY_TABLE_NAME_OPTION, QUALIFIED_TABLE_NAME,
RECOVERY_ZONE_NAME_OPTION, ZONE,
RECOVERY_PARTITION_IDS_OPTION, "1,2",
+ RECOVERY_NODE_NAMES_OPTION, nodeName,
RECOVERY_WITH_CLEANUP_OPTION
);
@@ -192,7 +202,6 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
assertOutputContains("Successfully restarted partitions.");
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
@Test
public void testRestartPartitionsByNodesWithCleanup() {
String nodeNames = CLUSTER.runningNodes()
@@ -208,8 +217,8 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
RECOVERY_WITH_CLEANUP_OPTION
);
- assertErrOutputIsEmpty();
- assertOutputContains("Successfully restarted partitions.");
+ assertOutputIsEmpty();
+ assertErrOutputContains("Only one node name should be specified for
the operation.");
}
@Test
@@ -227,7 +236,7 @@ public abstract class ItRestartPartitionsTest extends
CliIntegrationTest {
RECOVERY_WITH_CLEANUP_OPTION
);
- assertErrOutputIsEmpty();
- assertOutputContains("Successfully restarted partitions.");
+ assertOutputIsEmpty();
+ assertErrOutputContains("Only one node name should be specified for
the operation.");
}
}
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 00915319d30..39db509458c 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -220,7 +220,8 @@ enum class code : underlying_t {
NODES_NOT_FOUND = 0x140002,
PARTITION_STATE = 0x140003,
CLUSTER_NOT_IDLE = 0x140004,
- RESTART_WITH_CLEAN_UP = 0x140005,
+ NOT_ENOUGH_ALIVE_NODES = 0x140005,
+ ILLEGAL_NODES_SET = 0x140006,
// Embedded group. Group code: 21
CLUSTER_NOT_INITIALIZED = 0x150001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 55d6e90caf8..2456ea01710 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -305,7 +305,8 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::ILLEGAL_PARTITION_ID:
case error::code::PARTITION_STATE:
case error::code::CLUSTER_NOT_IDLE:
- case error::code::RESTART_WITH_CLEAN_UP:
+ case error::code::NOT_ENOUGH_ALIVE_NODES:
+ case error::code::ILLEGAL_NODES_SET:
return sql_state::SHY000_GENERAL_ERROR;
// Embedded group. Group code: 21
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index e10b6464a8e..b3c2120a7c9 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -706,8 +706,11 @@ namespace Apache.Ignite
/// <summary> ClusterNotIdle error. </summary>
public const int ClusterNotIdle = (GroupCode << 16) | (4 & 0xFFFF);
- /// <summary> RestartWithCleanUp error. </summary>
- public const int RestartWithCleanUp = (GroupCode << 16) | (5 &
0xFFFF);
+ /// <summary> NotEnoughAliveNodes error. </summary>
+ public const int NotEnoughAliveNodes = (GroupCode << 16) | (5 &
0xFFFF);
+
+ /// <summary> IllegalNodesSet error. </summary>
+ public const int IllegalNodesSet = (GroupCode << 16) | (6 &
0xFFFF);
}
/// <summary> Embedded errors. </summary>
diff --git
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
index 576acb3937b..5739c244e64 100644
---
a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
+++
b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/exception/handler/IgniteInternalExceptionHandler.java
@@ -40,7 +40,8 @@ public class IgniteInternalExceptionHandler implements
ExceptionHandler<IgniteIn
private static final Set<Integer> BAD_REQUEST_CODES = Set.of(
DistributionZones.ZONE_NOT_FOUND_ERR,
DisasterRecovery.ILLEGAL_PARTITION_ID_ERR,
- DisasterRecovery.NODES_NOT_FOUND_ERR
+ DisasterRecovery.NODES_NOT_FOUND_ERR,
+ DisasterRecovery.ILLEGAL_NODES_SET_ERR
);
@Override
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
index ac93564d27b..14943569d83 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java
@@ -70,8 +70,13 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
HttpClient client;
@BeforeAll
- public void setUp() {
- sql(String.format("CREATE ZONE \"%s\" (REPLICAS %s) storage profiles
['%s']", FIRST_ZONE, 3, DEFAULT_AIPERSIST_PROFILE_NAME));
+ public void setUp() throws InterruptedException {
+ sql(String.format(
+ "CREATE ZONE \"%s\" (REPLICAS %s) storage profiles ['%s']",
+ FIRST_ZONE,
+ initialNodes(),
+ DEFAULT_AIPERSIST_PROFILE_NAME
+ ));
sql(String.format("CREATE TABLE PUBLIC.\"%s\" (id INT PRIMARY KEY, val
INT) ZONE \"%s\"", TABLE_NAME,
FIRST_ZONE));
@@ -84,8 +89,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
unknownZone, QUALIFIED_TABLE_NAME, Set.of());
- HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
- () -> client.toBlocking().exchange(post));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
@@ -100,8 +104,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, tableName, Set.of());
- HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
- () -> client.toBlocking().exchange(post));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
assertThat(e.getMessage(), containsString("The table does not exist
[name=" + tableName.toUpperCase() + "]"));
@@ -111,8 +114,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
void testRestartPartitionsWithCleanupIllegalPartitionNegative() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 5, -1, -10));
- HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
- () -> client.toBlocking().exchange(post));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
@@ -123,8 +125,7 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
void testRestartPartitionsWithCleanupPartitionsOutOfRange() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(DEFAULT_PARTITION_COUNT));
- HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
- () -> client.toBlocking().exchange(post));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
assertThat(e.getMessage(), containsString(
@@ -139,33 +140,31 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
@Test
void testRestartPartitionsWithCleanupNodesAreCaseSensitive() {
- Set<String> uppercaseNodeNames = nodeNames(initialNodes() - 1).stream()
- .map(String::toUpperCase)
- .collect(toSet());
+ Set<String> uppercaseNodeName =
Set.of(CLUSTER.aliveNode().name().toUpperCase());
- MutableHttpRequest<?> post =
restartPartitionsRequest(uppercaseNodeNames, FIRST_ZONE, QUALIFIED_TABLE_NAME,
Set.of());
+ MutableHttpRequest<?> post =
restartPartitionsRequest(uppercaseNodeName, FIRST_ZONE, QUALIFIED_TABLE_NAME,
Set.of());
- HttpClientResponseException e =
assertThrows(HttpClientResponseException.class,
- () -> client.toBlocking().exchange(post));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
assertThat(e.getStatus(), equalTo(HttpStatus.BAD_REQUEST));
- uppercaseNodeNames.forEach(nodeName -> assertThat(e.getMessage(),
containsString(nodeName)));
+ uppercaseNodeName.forEach(nodeName -> assertThat(e.getMessage(),
containsString(nodeName)));
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
- public void testRestartAllPartitionsWithCleanup() {
+ public void testPartitionsWithCleanupEmptySetOfNodes() {
MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
- HttpResponse<Void> response = client.toBlocking().exchange(post);
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
- assertThat(response.getStatus().getCode(), is(OK.code()));
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+ assertThat(e.getMessage(), containsString("Only one node name should
be specified for the operation."));
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
public void testRestartSpecifiedPartitionsWithCleanup() {
- MutableHttpRequest<?> post = restartPartitionsRequest(Set.of(),
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
+ Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+ MutableHttpRequest<?> post = restartPartitionsRequest(nodeName,
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of(0, 1));
HttpResponse<Void> response = client.toBlocking().exchange(post);
@@ -173,25 +172,50 @@ public class
ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest extend
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
public void testRestartPartitionsWithCleanupByNodes() {
- Set<String> nodeNames = nodeNames(initialNodes() - 1);
+ Set<String> nodeNames = nodeNames(initialNodes());
MutableHttpRequest<?> post = restartPartitionsRequest(nodeNames,
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
- HttpResponse<Void> response = client.toBlocking().exchange(post);
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
- assertThat(response.getStatus().getCode(), is(OK.code()));
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+ assertThat(e.getMessage(), containsString("Only one node name should
be specified for the operation."));
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26337")
public void testRestartTablePartitionsWithCleanupByNodes() {
Set<String> nodeNames = nodeNames(initialNodes() - 1);
MutableHttpRequest<?> post =
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
new RestartPartitionsRequest(nodeNames, FIRST_ZONE,
QUALIFIED_TABLE_NAME, Set.of()));
+ HttpClientResponseException e =
assertThrows(HttpClientResponseException.class, () ->
client.toBlocking().exchange(post));
+
+ assertThat(e.getResponse().code(), is(BAD_REQUEST.code()));
+ assertThat(e.getMessage(), containsString("Only one node name should
be specified for the operation."));
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638")
+ public void testRestartPartitionsWithCleanupAllPartitions() {
+ Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+ MutableHttpRequest<?> post = restartPartitionsRequest(nodeName,
FIRST_ZONE, QUALIFIED_TABLE_NAME, Set.of());
+
+ HttpResponse<Void> response = client.toBlocking().exchange(post);
+
+ assertThat(response.getStatus().getCode(), is(OK.code()));
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26638")
+ public void testRestartTablePartitionsWithCleanupAllPartitions() {
+ Set<String> nodeName = Set.of(CLUSTER.aliveNode().name());
+
+ MutableHttpRequest<?> post =
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
+ new RestartPartitionsRequest(nodeName, FIRST_ZONE,
QUALIFIED_TABLE_NAME, Set.of()));
+
HttpResponse<Void> response = client.toBlocking().exchange(post);
assertThat(response.getStatus().getCode(), is(OK.code()));
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index 234ed90c073..71d4ceefb30 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -320,6 +320,7 @@ public class ItHighAvailablePartitionsRecoveryTest extends
AbstractHighAvailable
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26657")
void testRebalanceInHaZone() throws InterruptedException {
createHaZoneWithTable();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index a8e8c1dfa69..cf18db038f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -46,6 +46,8 @@ import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.PARTITION_STATE_ERR;
import java.util.ArrayList;
@@ -123,10 +125,13 @@ import
org.apache.ignite.internal.systemview.api.SystemViewProvider;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalNodesException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.IllegalPartitionIdException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NodesNotFoundException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import
org.apache.ignite.internal.table.distributed.disaster.exceptions.ZonesNotFoundException;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.versioned.VersionedSerialization;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.raft.jraft.RaftGroupService;
@@ -168,6 +173,9 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
*/
private static final int CATCH_UP_THRESHOLD = 100;
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
/** Thread pool executor for async parts. */
private final ExecutorService threadPool;
@@ -246,39 +254,43 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
this.nodeProperties = nodeProperties;
this.systemViewManager = systemViewManager;
- watchListener = event -> {
+ watchListener = event -> inBusyLock(busyLock, () -> {
handleTriggerKeyUpdate(event);
// There is no need to block a watch thread any longer.
return nullCompletedFuture();
- };
+ });
}
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- systemViewManager.register(this);
+ return inBusyLock(busyLock, () -> {
+ systemViewManager.register(this);
-
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class,
this::handleMessage);
+
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class,
this::handleMessage);
- metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY,
watchListener);
+ metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY,
watchListener);
- if (!nodeProperties.colocationEnabled()) {
- dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED,
this::onHaZoneTablePartitionTopologyReduce);
- } else {
- dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED,
this::onHaZonePartitionTopologyReduce);
- }
+ if (!nodeProperties.colocationEnabled()) {
+ dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED,
this::onHaZoneTablePartitionTopologyReduce);
+ } else {
+ dzManager.listen(HaZoneTopologyUpdateEvent.TOPOLOGY_REDUCED,
this::onHaZonePartitionTopologyReduce);
+ }
- catalogManager.listen(TABLE_CREATE, fromConsumer(this::onTableCreate));
+ catalogManager.listen(TABLE_CREATE,
fromConsumer(this::onTableCreate));
- catalogManager.listen(TABLE_DROP, fromConsumer(this::onTableDrop));
+ catalogManager.listen(TABLE_DROP, fromConsumer(this::onTableDrop));
- registerMetricSources();
+ registerMetricSources();
- return nullCompletedFuture();
+ return nullCompletedFuture();
+ });
}
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ busyLock.block();
+
metaStorageManager.unregisterWatch(watchListener);
for (CompletableFuture<Void> future : ongoingOperationsById.values()) {
@@ -310,60 +322,70 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return ongoingOperationsById;
}
- private CompletableFuture<Boolean>
onHaZoneTablePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
- int zoneId = params.zoneId();
- long revision = params.causalityToken();
- long timestamp =
metaStorageManager.timestampByRevisionLocally(revision).longValue();
-
- Catalog catalog = catalogManager.activeCatalog(timestamp);
- CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+ public IgniteSpinBusyLock busyLock() {
+ return busyLock;
+ }
- Map<Integer, Set<Integer>> tablePartitionsToReset = new HashMap<>();
- for (CatalogTableDescriptor table : catalog.tables(zoneId)) {
- Set<Integer> partitionsToReset = new HashSet<>();
- for (int partId = 0; partId < zoneDescriptor.partitions();
partId++) {
- TablePartitionId partitionId = new
TablePartitionId(table.id(), partId);
+ private CompletableFuture<Boolean>
onHaZoneTablePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
+ return inBusyLock(busyLock, () -> {
+ int zoneId = params.zoneId();
+ long revision = params.causalityToken();
+ long timestamp =
metaStorageManager.timestampByRevisionLocally(revision).longValue();
+
+ Catalog catalog = catalogManager.activeCatalog(timestamp);
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+
+ Map<Integer, Set<Integer>> tablePartitionsToReset = new
HashMap<>();
+ for (CatalogTableDescriptor table : catalog.tables(zoneId)) {
+ Set<Integer> partitionsToReset = new HashSet<>();
+ for (int partId = 0; partId < zoneDescriptor.partitions();
partId++) {
+ TablePartitionId partitionId = new
TablePartitionId(table.id(), partId);
+
+ if (stableAssignmentsWithOnlyAliveNodes(partitionId,
revision, false).size() < calculateQuorum(
+ zoneDescriptor.replicas())) {
+ partitionsToReset.add(partId);
+ }
+ }
- if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision,
false).size() < calculateQuorum(zoneDescriptor.replicas())) {
- partitionsToReset.add(partId);
+ if (!partitionsToReset.isEmpty()) {
+ tablePartitionsToReset.put(table.id(), partitionsToReset);
}
}
- if (!partitionsToReset.isEmpty()) {
- tablePartitionsToReset.put(table.id(), partitionsToReset);
+ if (!tablePartitionsToReset.isEmpty()) {
+ return resetPartitions(zoneDescriptor.name(),
tablePartitionsToReset, false, revision, false).thenApply(r -> false);
+ } else {
+ return falseCompletedFuture();
}
- }
-
- if (!tablePartitionsToReset.isEmpty()) {
- return resetPartitions(zoneDescriptor.name(),
tablePartitionsToReset, false, revision, false).thenApply(r -> false);
- } else {
- return falseCompletedFuture();
- }
+ });
}
private CompletableFuture<Boolean>
onHaZonePartitionTopologyReduce(HaZoneTopologyUpdateEventParams params) {
- int zoneId = params.zoneId();
- long revision = params.causalityToken();
- long timestamp =
metaStorageManager.timestampByRevisionLocally(revision).longValue();
+ return inBusyLock(busyLock, () -> {
+ int zoneId = params.zoneId();
+ long revision = params.causalityToken();
+ long timestamp =
metaStorageManager.timestampByRevisionLocally(revision).longValue();
- Catalog catalog = catalogManager.activeCatalog(timestamp);
- CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+ Catalog catalog = catalogManager.activeCatalog(timestamp);
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
- Set<Integer> partitionsToReset = new HashSet<>();
+ Set<Integer> partitionsToReset = new HashSet<>();
- for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
- ZonePartitionId partitionId = new ZonePartitionId(zoneId, partId);
+ for (int partId = 0; partId < zoneDescriptor.partitions();
partId++) {
+ ZonePartitionId partitionId = new ZonePartitionId(zoneId,
partId);
- if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision,
true).size() < calculateQuorum(zoneDescriptor.replicas())) {
- partitionsToReset.add(partId);
+ if (stableAssignmentsWithOnlyAliveNodes(partitionId, revision,
true).size() < calculateQuorum(zoneDescriptor.replicas())) {
+ partitionsToReset.add(partId);
+ }
}
- }
- if (!partitionsToReset.isEmpty()) {
- return resetPartitions(zoneDescriptor.name(), Map.of(zoneId,
partitionsToReset), false, revision, true).thenApply(r -> false);
- } else {
- return falseCompletedFuture();
- }
+ if (!partitionsToReset.isEmpty()) {
+ return resetPartitions(zoneDescriptor.name(), Map.of(zoneId,
partitionsToReset), false, revision, true).thenApply(
+ r -> false);
+ } else {
+ return falseCompletedFuture();
+ }
+ });
}
private Set<Assignment>
stableAssignmentsWithOnlyAliveNodes(ReplicationGroupId partitionId, long
revision, boolean colocationEnabled) {
@@ -403,9 +425,11 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
* @return Future that completes when partitions are reset.
*/
public CompletableFuture<Void> resetTablePartitions(String zoneName,
String schemaName, String tableName, Set<Integer> partitionIds) {
- int tableId = tableDescriptor(catalogLatestVersion(), schemaName,
tableName).id();
+ return inBusyLock(busyLock, () -> {
+ int tableId = tableDescriptor(catalogLatestVersion(), schemaName,
tableName).id();
- return resetPartitions(zoneName, Map.of(tableId, partitionIds), true,
-1, false);
+ return resetPartitions(zoneName, Map.of(tableId, partitionIds),
true, -1, false);
+ });
}
/**
@@ -430,9 +454,11 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
boolean manualUpdate,
long triggerRevision
) {
- int tableId = tableDescriptor(catalogLatestVersion(), schemaName,
tableName).id();
+ return inBusyLock(busyLock, () -> {
+ int tableId = tableDescriptor(catalogLatestVersion(), schemaName,
tableName).id();
- return resetPartitions(zoneName, Map.of(tableId, partitionIds),
manualUpdate, triggerRevision, false);
+ return resetPartitions(zoneName, Map.of(tableId, partitionIds),
manualUpdate, triggerRevision, false);
+ });
}
/**
@@ -446,9 +472,11 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
* @return Future that completes when partitions are reset.
*/
public CompletableFuture<Void> resetPartitions(String zoneName,
Set<Integer> partitionIds) {
- int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
+ return inBusyLock(busyLock, () -> {
+ int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
- return resetPartitions(zoneName, Map.of(zoneId, partitionIds), true,
-1, true);
+ return resetPartitions(zoneName, Map.of(zoneId, partitionIds),
true, -1, true);
+ });
}
/**
@@ -469,9 +497,11 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
boolean manualUpdate,
long triggerRevision
) {
- int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
+ return inBusyLock(busyLock, () -> {
+ int zoneId = zoneDescriptor(catalogLatestVersion(), zoneName).id();
- return resetPartitions(zoneName, Map.of(zoneId, partitionIds),
manualUpdate, triggerRevision, true);
+ return resetPartitions(zoneName, Map.of(zoneId, partitionIds),
manualUpdate, triggerRevision, true);
+ });
}
/**
@@ -494,27 +524,29 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
long triggerRevision,
boolean colocationEnabled
) {
- try {
- Catalog catalog = catalogLatestVersion();
-
- CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
- partitionIds.values().forEach(ids -> checkPartitionsRange(ids,
Set.of(zone)));
-
- return processNewRequest(
- GroupUpdateRequest.create(
- UUID.randomUUID(),
- catalog.version(),
- zone.id(),
- partitionIds,
- manualUpdate,
- colocationEnabled
- ),
- triggerRevision
- );
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ Catalog catalog = catalogLatestVersion();
+
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+ partitionIds.values().forEach(ids -> checkPartitionsRange(ids,
Set.of(zone)));
+
+ return processNewRequest(
+ GroupUpdateRequest.create(
+ UUID.randomUUID(),
+ catalog.version(),
+ zone.id(),
+ partitionIds,
+ manualUpdate,
+ colocationEnabled
+ ),
+ triggerRevision
+ );
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
@@ -534,36 +566,38 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
String tableName,
Set<Integer> partitionIds
) {
- try {
- // Validates passed node names.
- getNodes(nodeNames);
+ return inBusyLock(busyLock, () -> {
+ try {
+ // Validates passed node names.
+ getNodes(nodeNames);
- Catalog catalog = catalogLatestVersion();
+ Catalog catalog = catalogLatestVersion();
- CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
- CatalogTableDescriptor table = tableDescriptor(catalog,
schemaName, tableName);
+ CatalogTableDescriptor table = tableDescriptor(catalog,
schemaName, tableName);
- checkPartitionsRange(partitionIds, Set.of(zone));
+ checkPartitionsRange(partitionIds, Set.of(zone));
- return processNewRequest(new ManualGroupRestartRequest(
- UUID.randomUUID(),
- zone.id(),
- table.id(),
- partitionIds,
- nodeNames,
- catalog.time(),
- false
- ));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return processNewRequest(new ManualGroupRestartRequest(
+ UUID.randomUUID(),
+ zone.id(),
+ table.id(),
+ partitionIds,
+ nodeNames,
+ catalog.time(),
+ false
+ ));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
* Restarts replica service and raft group of passed partitions with
cleaning up partition storages.
*
- * @param nodeNames Names specifying nodes to restart partitions.
Case-sensitive, empty set means "all nodes".
+ * @param nodeNames Names specifying nodes to restart partitions. Only one
node is allowed.
* @param zoneName Name of the distribution zone. Case-sensitive, without
quotes.
* @param schemaName Schema name. Case-sensitive, without quotes.
* @param tableName Table name. Case-sensitive, without quotes.
@@ -577,30 +611,34 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
String tableName,
Set<Integer> partitionIds
) {
- try {
- // Validates passed node names.
- getNodes(nodeNames);
+ return inBusyLock(busyLock, () -> {
+ try {
+ // Validates passed node names.
+ getNodes(nodeNames);
- Catalog catalog = catalogLatestVersion();
+ Catalog catalog = catalogLatestVersion();
- CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
- CatalogTableDescriptor table = tableDescriptor(catalog,
schemaName, tableName);
+ CatalogTableDescriptor table = tableDescriptor(catalog,
schemaName, tableName);
- checkPartitionsRange(partitionIds, Set.of(zone));
+ checkPartitionsRange(partitionIds, Set.of(zone));
- return processNewRequest(new ManualGroupRestartRequest(
- UUID.randomUUID(),
- zone.id(),
- table.id(),
- partitionIds,
- nodeNames,
- catalog.time(),
- true
- ));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ checkOnlyOneNodeSpecified(nodeNames);
+
+ return processNewRequest(new ManualGroupRestartRequest(
+ UUID.randomUUID(),
+ zone.id(),
+ table.id(),
+ partitionIds,
+ nodeNames,
+ catalog.time(),
+ true
+ ));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
@@ -616,37 +654,39 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
String zoneName,
Set<Integer> partitionIds
) {
- try {
- // Validates passed node names.
- getNodes(nodeNames);
-
- Catalog catalog = catalogLatestVersion();
-
- CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
- checkPartitionsRange(partitionIds, Set.of(zone));
-
- return processNewRequest(new ManualGroupRestartRequest(
- UUID.randomUUID(),
- zone.id(),
- // We pass here -1 as table id because it is not used for
zone-based partitions.
- // We expect that the field will be removed once
colocation track is finished.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
- -1,
- partitionIds,
- nodeNames,
- catalog.time(),
- false
- ));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ // Validates passed node names.
+ getNodes(nodeNames);
+
+ Catalog catalog = catalogLatestVersion();
+
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+ checkPartitionsRange(partitionIds, Set.of(zone));
+
+ return processNewRequest(new ManualGroupRestartRequest(
+ UUID.randomUUID(),
+ zone.id(),
+ // We pass here -1 as table id because it is not used
for zone-based partitions.
+ // We expect that the field will be removed once
colocation track is finished.
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-22522
+ -1,
+ partitionIds,
+ nodeNames,
+ catalog.time(),
+ false
+ ));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
* Restart partitions of a zone with cleanup. This method destroys
partition storage during restart.
*
- * @param nodeNames Names of nodes to restart partitions on. If empty,
restart on all nodes.
+ * @param nodeNames Names of nodes to restart partitions on. Only one node
is allowed.
* @param zoneName Zone name. Case-sensitive, without quotes.
* @param partitionIds IDs of partitions to restart. If empty, restart all
zone's partitions.
* @return Future that completes when partitions are restarted.
@@ -656,31 +696,35 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
String zoneName,
Set<Integer> partitionIds
) {
- try {
- // Validates passed node names.
- getNodes(nodeNames);
-
- Catalog catalog = catalogLatestVersion();
-
- CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
-
- checkPartitionsRange(partitionIds, Set.of(zone));
-
- return processNewRequest(new ManualGroupRestartRequest(
- UUID.randomUUID(),
- zone.id(),
- // We pass here -1 as table id because it is not used for
zone-based partitions.
- // We expect that the field will be removed once
colocation track is finished.
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22522
- -1,
- partitionIds,
- nodeNames,
- catalog.time(),
- true
- ));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ // Validates passed node names.
+ getNodes(nodeNames);
+
+ Catalog catalog = catalogLatestVersion();
+
+ CatalogZoneDescriptor zone = zoneDescriptor(catalog, zoneName);
+
+ checkPartitionsRange(partitionIds, Set.of(zone));
+
+ checkOnlyOneNodeSpecified(nodeNames);
+
+ return processNewRequest(new ManualGroupRestartRequest(
+ UUID.randomUUID(),
+ zone.id(),
+ // We pass here -1 as table id because it is not used
for zone-based partitions.
+ // We expect that the field will be removed once
colocation track is finished.
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-22522
+ -1,
+ partitionIds,
+ nodeNames,
+ catalog.time(),
+ true
+ ));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
@@ -697,21 +741,23 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Set<String> nodeNames,
Set<Integer> partitionIds
) {
- try {
- assert nodeProperties.colocationEnabled() : "Zone based
replication is unavailable use localTablePartitionStates";
-
- Catalog catalog = catalogLatestVersion();
-
- return localPartitionStatesInternal(
- zoneNames,
- nodeNames,
- partitionIds,
- catalog,
- zoneState()
- ).thenApply(res -> normalizeLocal(res, catalog));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ assert nodeProperties.colocationEnabled() : "Zone based
replication is unavailable use localTablePartitionStates";
+
+ Catalog catalog = catalogLatestVersion();
+
+ return localPartitionStatesInternal(
+ zoneNames,
+ nodeNames,
+ partitionIds,
+ catalog,
+ zoneState()
+ ).thenApply(res -> normalizeLocal(res, catalog));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
/**
@@ -725,23 +771,25 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Set<String> zoneNames,
Set<Integer> partitionIds
) {
- try {
- assert nodeProperties.colocationEnabled() : "Zone based
replication is unavailable use globalTablePartitionStates";
-
- Catalog catalog = catalogLatestVersion();
-
- return localPartitionStatesInternal(
- zoneNames,
- Set.of(),
- partitionIds,
- catalog,
- zoneState()
- )
- .thenApply(res -> normalizeLocal(res, catalog))
- .thenApply(res -> assembleGlobal(res, partitionIds,
catalog));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ assert nodeProperties.colocationEnabled() : "Zone based
replication is unavailable use globalTablePartitionStates";
+
+ Catalog catalog = catalogLatestVersion();
+
+ return localPartitionStatesInternal(
+ zoneNames,
+ Set.of(),
+ partitionIds,
+ catalog,
+ zoneState()
+ )
+ .thenApply(res -> normalizeLocal(res, catalog))
+ .thenApply(res -> assembleGlobal(res, partitionIds,
catalog));
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ });
}
static Function<LocalPartitionStateMessage, ZonePartitionId> zoneState() {
@@ -755,56 +803,60 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Catalog catalog,
Function<LocalPartitionStateMessage, T> keyExtractor
) {
- Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames,
catalog.zones());
+ return inBusyLock(busyLock, () -> {
+ Collection<CatalogZoneDescriptor> zones = filterZones(zoneNames,
catalog.zones());
- checkPartitionsRange(partitionIds, zones);
+ checkPartitionsRange(partitionIds, zones);
- Set<NodeWithAttributes> nodes = getNodes(nodeNames);
+ Set<NodeWithAttributes> nodes = getNodes(nodeNames);
- Set<Integer> zoneIds =
zones.stream().map(CatalogObjectDescriptor::id).collect(toSet());
+ Set<Integer> zoneIds =
zones.stream().map(CatalogObjectDescriptor::id).collect(toSet());
- LocalPartitionStatesRequest localPartitionStatesRequest =
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest()
- .zoneIds(zoneIds)
- .partitionIds(partitionIds)
- .catalogVersion(catalog.version())
- .build();
+ LocalPartitionStatesRequest localPartitionStatesRequest =
PARTITION_REPLICATION_MESSAGES_FACTORY.localPartitionStatesRequest()
+ .zoneIds(zoneIds)
+ .partitionIds(partitionIds)
+ .catalogVersion(catalog.version())
+ .build();
- Map<T, LocalPartitionStateMessageByNode> result = new
ConcurrentHashMap<>();
- CompletableFuture<?>[] futures = new CompletableFuture[nodes.size()];
+ Map<T, LocalPartitionStateMessageByNode> result = new
ConcurrentHashMap<>();
+ CompletableFuture<?>[] futures = new
CompletableFuture[nodes.size()];
- int i = 0;
- for (NodeWithAttributes node : nodes) {
- CompletableFuture<NetworkMessage> invokeFuture =
messagingService.invoke(
- node.nodeName(),
- localPartitionStatesRequest,
- TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
- );
+ int i = 0;
+ for (NodeWithAttributes node : nodes) {
+ CompletableFuture<NetworkMessage> invokeFuture =
messagingService.invoke(
+ node.nodeName(),
+ localPartitionStatesRequest,
+ TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS)
+ );
- futures[i++] = invokeFuture.thenAccept(networkMessage -> {
- assert networkMessage instanceof LocalPartitionStatesResponse
: networkMessage;
+ futures[i++] = invokeFuture.thenAccept(networkMessage -> {
+ inBusyLock(busyLock, () -> {
+ assert networkMessage instanceof
LocalPartitionStatesResponse : networkMessage;
- var response = (LocalPartitionStatesResponse) networkMessage;
+ var response = (LocalPartitionStatesResponse)
networkMessage;
- for (LocalPartitionStateMessage state : response.states()) {
- result.compute(keyExtractor.apply(state), (partitionId,
messageByNode) -> {
- if (messageByNode == null) {
- return new
LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
- }
+ for (LocalPartitionStateMessage state :
response.states()) {
+ result.compute(keyExtractor.apply(state),
(partitionId, messageByNode) -> {
+ if (messageByNode == null) {
+ return new
LocalPartitionStateMessageByNode(Map.of(node.nodeName(), state));
+ }
- messageByNode = new
LocalPartitionStateMessageByNode(messageByNode);
- messageByNode.put(node.nodeName(), state);
- return messageByNode;
+ messageByNode = new
LocalPartitionStateMessageByNode(messageByNode);
+ messageByNode.put(node.nodeName(), state);
+ return messageByNode;
+ });
+ }
});
- }
- });
- }
-
- return allOf(futures).handle((unused, err) -> {
- if (err != null) {
- throw new DisasterRecoveryException(PARTITION_STATE_ERR, err);
+ });
}
- return result;
+ return allOf(futures).handle((unused, err) -> {
+ if (err != null) {
+ throw new DisasterRecoveryException(PARTITION_STATE_ERR,
err);
+ }
+
+ return result;
+ });
});
}
@@ -822,34 +874,36 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Set<String> nodeNames,
Set<Integer> partitionIds
) {
- try {
- Catalog catalog = catalogLatestVersion();
+ return inBusyLock(busyLock, () -> {
+ try {
+ Catalog catalog = catalogLatestVersion();
+
+ if (nodeProperties.colocationEnabled()) {
+ return localPartitionStatesInternal(
+ zoneNames,
+ nodeNames,
+ partitionIds,
+ catalog,
+ zoneState()
+ )
+ .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
+ .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
+ )
+ .thenApply(res -> normalizeTableLocal(res,
catalog));
+ }
- if (nodeProperties.colocationEnabled()) {
return localPartitionStatesInternal(
zoneNames,
nodeNames,
partitionIds,
catalog,
- zoneState()
+ tableState()
)
- .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
- .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
- )
.thenApply(res -> normalizeTableLocal(res, catalog));
+ } catch (Throwable t) {
+ return failedFuture(t);
}
-
- return localPartitionStatesInternal(
- zoneNames,
- nodeNames,
- partitionIds,
- catalog,
- tableState()
- )
- .thenApply(res -> normalizeTableLocal(res, catalog));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ });
}
/**
@@ -863,36 +917,38 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
Set<String> zoneNames,
Set<Integer> partitionIds
) {
- try {
- Catalog catalog = catalogLatestVersion();
+ return inBusyLock(busyLock, () -> {
+ try {
+ Catalog catalog = catalogLatestVersion();
+
+ if (nodeProperties.colocationEnabled()) {
+ return localPartitionStatesInternal(
+ zoneNames,
+ Set.of(),
+ partitionIds,
+ catalog,
+ zoneState()
+ )
+ .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
+ .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
+ )
+ .thenApply(res -> normalizeTableLocal(res,
catalog))
+ .thenApply(res -> assembleTableGlobal(res,
partitionIds, catalog));
+ }
- if (nodeProperties.colocationEnabled()) {
return localPartitionStatesInternal(
zoneNames,
Set.of(),
partitionIds,
catalog,
- zoneState()
+ tableState()
)
- .thenCompose(res ->
tableStateForZone(toZonesOnNodes(res), catalog.version())
- .thenApply(tableState ->
zoneStateToTableState(res, tableState, catalog))
- )
.thenApply(res -> normalizeTableLocal(res, catalog))
.thenApply(res -> assembleTableGlobal(res,
partitionIds, catalog));
+ } catch (Throwable t) {
+ return failedFuture(t);
}
-
- return localPartitionStatesInternal(
- zoneNames,
- Set.of(),
- partitionIds,
- catalog,
- tableState()
- )
- .thenApply(res -> normalizeTableLocal(res, catalog))
- .thenApply(res -> assembleTableGlobal(res, partitionIds,
catalog));
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ });
}
/**
@@ -1064,6 +1120,12 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
});
}
+ private static void checkOnlyOneNodeSpecified(Set<String> nodeNames) {
+ if (nodeNames.size() != 1) {
+ throw new IllegalNodesException();
+ }
+ }
+
private Set<NodeWithAttributes> getNodes(Set<String> nodeNames) throws
NodesNotFoundException {
if (nodeNames.isEmpty()) {
return dzManager.logicalTopology();
@@ -1212,19 +1274,34 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
return;
}
- request.handle(this, watchEvent.revision(),
watchEvent.timestamp()).whenComplete(copyStateTo(operationFuture));
+ request.handle(this, watchEvent.revision(),
watchEvent.timestamp())
+ .handle((res, ex) -> inBusyLock(busyLock, () -> {
+ copyStateTo(operationFuture).accept(res, ex);
+
+ if (ex != null) {
+ if (!hasCause(ex,
NodeStoppingException.class)) {
+ failureManager.process(new
FailureContext(ex, "Unable to handle disaster recovery request."));
+ }
+ }
+ return null;
+ }));
break;
case MULTI_NODE:
- CompletableFuture<Void> handleFuture = request.handle(this,
watchEvent.revision(), watchEvent.timestamp());
-
- if (operationFuture == null) {
- // We're not the initiator, or timeout has passed.
- return;
- }
-
- handleFuture.whenComplete(copyStateTo(operationFuture));
-
+ request.handle(this, watchEvent.revision(),
watchEvent.timestamp())
+ .handle((res, ex) -> inBusyLock(busyLock, () -> {
+ if (operationFuture != null) {
+ copyStateTo(operationFuture).accept(res, ex);
+ }
+
+ if (ex != null) {
+ if (!hasCause(ex, NodeStoppingException.class,
NotEnoughAliveNodesException.class)) {
+ failureManager.process(new
FailureContext(ex, "Unable to handle disaster recovery request."));
+ }
+ }
+
+ return null;
+ }));
break;
default:
var error = new AssertionError("Unexpected request type: " +
request.getClass());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
index 0e2e59a9dbd..213d39f4023 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestHandler.java
@@ -42,6 +42,7 @@ import static
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.zoneState;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR;
import java.util.ArrayList;
@@ -100,64 +101,67 @@ abstract class GroupUpdateRequestHandler<T extends
PartitionGroupId> {
}
public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
- int catalogVersion =
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ int catalogVersion =
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
- if (request.catalogVersion() != catalogVersion) {
- return failedFuture(
- new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR,
"Cluster is not idle, concurrent DDL update detected.")
- );
- }
+ if (request.catalogVersion() != catalogVersion) {
+ return failedFuture(
+ new DisasterRecoveryException(CLUSTER_NOT_IDLE_ERR,
"Cluster is not idle, concurrent DDL update detected.")
+ );
+ }
- Catalog catalog =
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
+ Catalog catalog =
disasterRecoveryManager.catalogManager.catalog(catalogVersion);
- int zoneId = request.zoneId();
+ int zoneId = request.zoneId();
- CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
- Set<Integer> allZonePartitionsToReset = new HashSet<>();
-
request.partitionIds().values().forEach(allZonePartitionsToReset::addAll);
+ Set<Integer> allZonePartitionsToReset = new HashSet<>();
+
request.partitionIds().values().forEach(allZonePartitionsToReset::addAll);
- CompletableFuture<Set<String>> dataNodesFuture =
- disasterRecoveryManager.dzManager.dataNodes(msTimestamp,
catalogVersion, zoneId);
+ CompletableFuture<Set<String>> dataNodesFuture =
+ disasterRecoveryManager.dzManager.dataNodes(msTimestamp,
catalogVersion, zoneId);
- CompletableFuture<Map<T, LocalPartitionStateMessageByNode>>
localStatesFuture =
- localStatesFuture(disasterRecoveryManager,
Set.of(zoneDescriptor.name()), allZonePartitionsToReset, catalog);
+ CompletableFuture<Map<T, LocalPartitionStateMessageByNode>>
localStatesFuture =
+ localStatesFuture(disasterRecoveryManager,
Set.of(zoneDescriptor.name()), allZonePartitionsToReset, catalog);
- return dataNodesFuture.thenCombine(localStatesFuture, (dataNodes,
localStatesMap) -> {
- Set<String> nodeConsistentIds =
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
- .stream()
- .map(NodeWithAttributes::nodeName)
- .collect(toSet());
+ return dataNodesFuture.thenCombine(localStatesFuture, (dataNodes,
localStatesMap) -> {
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ Set<String> nodeConsistentIds =
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
+ .stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(toSet());
- List<CompletableFuture<Void>> assignmentsUpdateFuts = new
ArrayList<>(request.partitionIds().size());
+ List<CompletableFuture<Void>> assignmentsUpdateFuts = new
ArrayList<>(request.partitionIds().size());
- for (Entry<Integer, Set<Integer>> partitionEntry :
request.partitionIds().entrySet()) {
+ for (Entry<Integer, Set<Integer>> partitionEntry :
request.partitionIds().entrySet()) {
- int[] partitionIdsArray =
AssignmentUtil.partitionIds(partitionEntry.getValue(),
zoneDescriptor.partitions());
+ int[] partitionIdsArray =
AssignmentUtil.partitionIds(partitionEntry.getValue(),
zoneDescriptor.partitions());
- assignmentsUpdateFuts.add(forceAssignmentsUpdate(
- partitionEntry.getKey(),
- zoneDescriptor,
- dataNodes,
- nodeConsistentIds,
- msRevision,
- msTimestamp,
- disasterRecoveryManager.metaStorageManager,
- localStatesMap,
- catalog.time(),
- partitionIdsArray,
- request.manualUpdate()
- ));
- }
+ assignmentsUpdateFuts.add(forceAssignmentsUpdate(
+ partitionEntry.getKey(),
+ zoneDescriptor,
+ dataNodes,
+ nodeConsistentIds,
+ msRevision,
+ msTimestamp,
+ disasterRecoveryManager.metaStorageManager,
+ localStatesMap,
+ catalog.time(),
+ partitionIdsArray,
+ request.manualUpdate(),
+ disasterRecoveryManager
+ ));
+ }
- return allOf(assignmentsUpdateFuts.toArray(new
CompletableFuture[]{}));
- })
- .thenCompose(Function.identity())
- .whenComplete((unused, throwable) -> {
- // TODO: IGNITE-23635 Add fail handling for failed resetPeers
- if (throwable != null) {
- LOG.error("Failed to reset partition", throwable);
- }
+ return allOf(assignmentsUpdateFuts.toArray(new
CompletableFuture[]{}));
+ }).whenComplete((unused, throwable) -> {
+ // TODO: IGNITE-23635 Add fail handling for failed
resetPeers
+ if (throwable != null) {
+ LOG.error("Failed to reset partition", throwable);
+ }
+ });
+ }).thenCompose(Function.identity());
});
}
@@ -188,31 +192,35 @@ abstract class GroupUpdateRequestHandler<T extends
PartitionGroupId> {
Map<T, LocalPartitionStateMessageByNode> localStatesMap,
long assignmentsTimestamp,
int[] partitionIds,
- boolean manualUpdate
+ boolean manualUpdate,
+ DisasterRecoveryManager disasterRecoveryManager
) {
- CompletableFuture<Map<Integer, Assignments>> stableAssignments =
- stableAssignments(metaStorageManager, replicationId,
partitionIds);
- return stableAssignments
- .thenCompose(assignments -> {
- if (assignments.isEmpty()) {
- return nullCompletedFuture();
- }
-
- return updateAssignments(
- replicationId,
- zoneDescriptor,
- dataNodes,
- aliveNodesConsistentIds,
- revision,
- timestamp,
- metaStorageManager,
- localStatesMap,
- assignmentsTimestamp,
- partitionIds,
- assignments,
- manualUpdate
- );
- });
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ CompletableFuture<Map<Integer, Assignments>> stableAssignments =
+ stableAssignments(metaStorageManager, replicationId,
partitionIds);
+ return stableAssignments
+ .thenCompose(assignments ->
inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ if (assignments.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ return updateAssignments(
+ replicationId,
+ zoneDescriptor,
+ dataNodes,
+ aliveNodesConsistentIds,
+ revision,
+ timestamp,
+ metaStorageManager,
+ localStatesMap,
+ assignmentsTimestamp,
+ partitionIds,
+ assignments,
+ manualUpdate,
+ disasterRecoveryManager
+ );
+ }));
+ });
}
private CompletableFuture<Void> updateAssignments(
@@ -227,40 +235,44 @@ abstract class GroupUpdateRequestHandler<T extends
PartitionGroupId> {
long assignmentsTimestamp,
int[] partitionIds,
Map<Integer, Assignments> stableAssignments,
- boolean manualUpdate
+ boolean manualUpdate,
+ DisasterRecoveryManager disasterRecoveryManager
) {
- Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes,
aliveNodesConsistentIds);
-
- CompletableFuture<?>[] futures = new
CompletableFuture[partitionIds.length];
-
- for (int i = 0; i < partitionIds.length; i++) {
- T replicaGrpId = replicationGroupId(replicationId,
partitionIds[i]);
- LocalPartitionStateMessageByNode localStatesByNode =
localStatesMap.containsKey(replicaGrpId)
- ? localStatesMap.get(replicaGrpId)
- : new LocalPartitionStateMessageByNode(emptyMap());
-
- futures[i] = partitionUpdate(
- replicaGrpId,
- aliveDataNodes,
- aliveNodesConsistentIds,
- zoneDescriptor.partitions(),
- zoneDescriptor.replicas(),
- zoneDescriptor.consensusGroupSize(),
- revision,
- timestamp,
- metaStorageManager,
- stableAssignments.get(replicaGrpId.partitionId()).nodes(),
- localStatesByNode,
- assignmentsTimestamp,
- manualUpdate
- ).thenAccept(res -> {
- DisasterRecoveryManager.LOG.info(
- "Partition {} returned {} status on reset attempt",
replicaGrpId, UpdateStatus.valueOf(res)
- );
- });
- }
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes,
aliveNodesConsistentIds);
+
+ CompletableFuture<?>[] futures = new
CompletableFuture[partitionIds.length];
+
+ for (int i = 0; i < partitionIds.length; i++) {
+ T replicaGrpId = replicationGroupId(replicationId,
partitionIds[i]);
+ LocalPartitionStateMessageByNode localStatesByNode =
localStatesMap.containsKey(replicaGrpId)
+ ? localStatesMap.get(replicaGrpId)
+ : new LocalPartitionStateMessageByNode(emptyMap());
+
+ futures[i] = partitionUpdate(
+ replicaGrpId,
+ aliveDataNodes,
+ aliveNodesConsistentIds,
+ zoneDescriptor.partitions(),
+ zoneDescriptor.replicas(),
+ zoneDescriptor.consensusGroupSize(),
+ revision,
+ timestamp,
+ metaStorageManager,
+
stableAssignments.get(replicaGrpId.partitionId()).nodes(),
+ localStatesByNode,
+ assignmentsTimestamp,
+ manualUpdate,
+ disasterRecoveryManager
+ ).thenAccept(res -> {
+ DisasterRecoveryManager.LOG.info(
+ "Partition {} returned {} status on reset
attempt", replicaGrpId, UpdateStatus.valueOf(res)
+ );
+ });
+ }
- return allOf(futures);
+ return allOf(futures);
+ });
}
private CompletableFuture<Integer> partitionUpdate(
@@ -276,51 +288,54 @@ abstract class GroupUpdateRequestHandler<T extends
PartitionGroupId> {
Set<Assignment> currentAssignments,
LocalPartitionStateMessageByNode localPartitionStateMessageByNode,
long assignmentsTimestamp,
- boolean manualUpdate
+ boolean manualUpdate,
+ DisasterRecoveryManager disasterRecoveryManager
) {
- Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
- Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentAssignments, partAssignments);
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
+ Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentAssignments, partAssignments);
- if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
- return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
- }
+ if (aliveStableNodes.size() >= (replicas / 2 + 1)) {
+ return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+ }
- if (aliveStableNodes.isEmpty() && !manualUpdate) {
- return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
- }
+ if (aliveStableNodes.isEmpty() && !manualUpdate) {
+ return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+ }
- if (manualUpdate) {
- enrichAssignments(partId, aliveDataNodes, partitions, replicas,
consensusGroupSize, partAssignments);
- }
+ if (manualUpdate) {
+ enrichAssignments(partId, aliveDataNodes, partitions,
replicas, consensusGroupSize, partAssignments);
+ }
- Assignment nextAssignment =
nextAssignment(localPartitionStateMessageByNode, partAssignments);
+ Assignment nextAssignment =
nextAssignment(localPartitionStateMessageByNode, partAssignments);
- boolean isProposedPendingEqualsProposedPlanned =
partAssignments.size() == 1;
+ boolean isProposedPendingEqualsProposedPlanned =
partAssignments.size() == 1;
- assert partAssignments.contains(nextAssignment) :
IgniteStringFormatter.format(
- "Recovery nodes set doesn't contain the reset node assignment
[partAssignments={}, nextAssignment={}]",
- partAssignments,
- nextAssignment
- );
+ assert partAssignments.contains(nextAssignment) :
IgniteStringFormatter.format(
+ "Recovery nodes set doesn't contain the reset node
assignment [partAssignments={}, nextAssignment={}]",
+ partAssignments,
+ nextAssignment
+ );
- // There are nodes with data, and we set pending assignments to this
set of nodes. It'll be the source of peers for
- // "resetPeers", and after that new assignments with restored replica
factor wil be picked up from planned assignments
- // for the case of the manual update, that was triggered by a user.
- AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
- .stable(Assignments.of(currentAssignments,
assignmentsTimestamp))
- .target(Assignments.forced(Set.of(nextAssignment),
assignmentsTimestamp))
- .toQueue();
-
- return invoke(
- partId,
- revision,
- timestamp,
- metaStorageMgr,
- assignmentsTimestamp,
- assignmentsQueue,
- isProposedPendingEqualsProposedPlanned,
- partAssignments
- );
+ // There are nodes with data, and we set pending assignments to
this set of nodes. It'll be the source of peers for
+ // "resetPeers", and after that new assignments with restored
replica factor wil be picked up from planned assignments
+ // for the case of the manual update, that was triggered by a user.
+ AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
+ .stable(Assignments.of(currentAssignments,
assignmentsTimestamp))
+ .target(Assignments.forced(Set.of(nextAssignment),
assignmentsTimestamp))
+ .toQueue();
+
+ return invoke(
+ partId,
+ revision,
+ timestamp,
+ metaStorageMgr,
+ assignmentsTimestamp,
+ assignmentsQueue,
+ isProposedPendingEqualsProposedPlanned,
+ partAssignments
+ );
+ });
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
index 4d3277a15bc..8bc008ba83a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupRestartRequest.java
@@ -26,7 +26,7 @@ import static
org.apache.ignite.internal.table.distributed.disaster.DisasterReco
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryRequestType.MULTI_NODE;
import static
org.apache.ignite.internal.table.distributed.disaster.GroupUpdateRequestHandler.getAliveNodesWithData;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.RESTART_WITH_CLEAN_UP_ERR;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,7 +49,7 @@ import org.apache.ignite.internal.replicator.PartitionGroupId;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
-import
org.apache.ignite.internal.table.distributed.disaster.exceptions.DisasterRecoveryException;
+import
org.apache.ignite.internal.table.distributed.disaster.exceptions.NotEnoughAliveNodesException;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.CollectionUtils;
@@ -123,30 +123,33 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
@Override
public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long revision, HybridTimestamp timestamp) {
- if (!nodeNames.isEmpty() &&
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
- return nullCompletedFuture();
- }
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ if (!nodeNames.isEmpty() &&
!nodeNames.contains(disasterRecoveryManager.localNode().name())) {
+ return nullCompletedFuture();
+ }
- Catalog catalog =
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
- CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
+ Catalog catalog =
disasterRecoveryManager.catalogManager.activeCatalog(timestamp.longValue());
+ CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);
- var restartFutures = new ArrayList<CompletableFuture<?>>();
+ var restartFutures = new ArrayList<CompletableFuture<?>>();
- disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
- ReplicationGroupId replicationGroupId = raftNodeId.groupId();
+ disasterRecoveryManager.raftManager.forEach((raftNodeId,
raftGroupService) -> {
+ ReplicationGroupId replicationGroupId = raftNodeId.groupId();
- if (shouldProcessPartition(replicationGroupId, zoneDescriptor)) {
- if (cleanUp) {
- restartFutures.add(
-
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId,
revision, zoneDescriptor, catalog)
- );
- } else {
-
restartFutures.add(createRestartFuture(disasterRecoveryManager,
replicationGroupId, revision));
+ if (shouldProcessPartition(replicationGroupId,
zoneDescriptor)) {
+ if (cleanUp) {
+ restartFutures.add(
+
createRestartWithCleanupFuture(disasterRecoveryManager, replicationGroupId,
revision, zoneDescriptor,
+ catalog)
+ );
+ } else {
+
restartFutures.add(createRestartFuture(disasterRecoveryManager,
replicationGroupId, revision));
+ }
}
- }
- });
+ });
- return restartFutures.isEmpty() ? nullCompletedFuture() :
allOf(restartFutures.toArray(CompletableFuture[]::new));
+ return restartFutures.isEmpty() ? nullCompletedFuture() :
allOf(restartFutures.toArray(CompletableFuture[]::new));
+ });
}
private boolean shouldProcessPartition(ReplicationGroupId
replicationGroupId, CatalogZoneDescriptor zoneDescriptor) {
@@ -216,38 +219,37 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
CatalogZoneDescriptor zoneDescriptor,
Catalog catalog
) {
- if (zoneDescriptor.consistencyMode() ==
ConsistencyMode.HIGH_AVAILABILITY) {
- if (zoneDescriptor.replicas() >= 2) {
- return createCleanupRestartFuture(disasterRecoveryManager,
replicationGroupId, revision);
- } else {
- return notEnoughAliveNodes();
- }
- } else {
- if (zoneDescriptor.replicas() <= 2) {
- return notEnoughAliveNodes();
- }
-
- return enoughAliveNodesToRestartWithCleanUp(
- disasterRecoveryManager,
- revision,
- replicationGroupId,
- zoneDescriptor,
- catalog
- ).thenCompose(enoughNodes -> {
- if (enoughNodes) {
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ if (zoneDescriptor.consistencyMode() ==
ConsistencyMode.HIGH_AVAILABILITY) {
+ if (zoneDescriptor.replicas() >= 2) {
return createCleanupRestartFuture(disasterRecoveryManager,
replicationGroupId, revision);
} else {
return notEnoughAliveNodes();
}
- });
- }
+ } else {
+ if (zoneDescriptor.replicas() <= 2) {
+ return notEnoughAliveNodes();
+ }
+
+ return enoughAliveNodesToRestartWithCleanUp(
+ disasterRecoveryManager,
+ revision,
+ replicationGroupId,
+ zoneDescriptor,
+ catalog
+ ).thenCompose(enoughNodes -> {
+ if (enoughNodes) {
+ return
createCleanupRestartFuture(disasterRecoveryManager, replicationGroupId,
revision);
+ } else {
+ return notEnoughAliveNodes();
+ }
+ });
+ }
+ });
}
private static <U> CompletableFuture<U> notEnoughAliveNodes() {
- return CompletableFuture.failedFuture(
- new DisasterRecoveryException(RESTART_WITH_CLEAN_UP_ERR, "Not
enough alive nodes "
- + "to perform reset with clean up.")
- );
+ return CompletableFuture.failedFuture(new
NotEnoughAliveNodesException());
}
private static CompletableFuture<Boolean>
enoughAliveNodesToRestartWithCleanUp(
@@ -303,29 +305,33 @@ class ManualGroupRestartRequest implements
DisasterRecoveryRequest {
Function<LocalPartitionStateMessage, T> keyExtractor,
CompletableFuture<Map<Integer, Assignments>> stableAssignments
) {
- Set<String> aliveNodesConsistentIds =
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
- .stream()
- .map(NodeWithAttributes::nodeName)
- .collect(Collectors.toSet());
-
- CompletableFuture<Map<T, LocalPartitionStateMessageByNode>>
localStatesFuture =
- disasterRecoveryManager.localPartitionStatesInternal(
- Set.of(zoneDescriptor.name()),
- emptySet(),
- Set.of(partitionGroupId.partitionId()),
- catalog,
- keyExtractor
- );
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ Set<String> aliveNodesConsistentIds =
disasterRecoveryManager.dzManager.logicalTopology(msRevision)
+ .stream()
+ .map(NodeWithAttributes::nodeName)
+ .collect(Collectors.toSet());
+
+ CompletableFuture<Map<T, LocalPartitionStateMessageByNode>>
localStatesFuture =
+ disasterRecoveryManager.localPartitionStatesInternal(
+ Set.of(zoneDescriptor.name()),
+ emptySet(),
+ Set.of(partitionGroupId.partitionId()),
+ catalog,
+ keyExtractor
+ );
- return localStatesFuture.thenCombine(stableAssignments,
(localPartitionStatesMap, currentAssignments) -> {
- LocalPartitionStateMessageByNode localPartitionStateMessageByNode
= localPartitionStatesMap.get(partitionGroupId);
+ return localStatesFuture.thenCombine(stableAssignments,
(localPartitionStatesMap, currentAssignments) -> {
+ return inBusyLock(disasterRecoveryManager.busyLock(), () -> {
+ LocalPartitionStateMessageByNode
localPartitionStateMessageByNode =
localPartitionStatesMap.get(partitionGroupId);
- Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
- Set<Assignment> currentStableAssignments =
currentAssignments.get(partitionGroupId.partitionId()).nodes();
+ Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
+ Set<Assignment> currentStableAssignments =
currentAssignments.get(partitionGroupId.partitionId()).nodes();
- Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentStableAssignments, partAssignments);
+ Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentStableAssignments, partAssignments);
- return aliveStableNodes.size() > (zoneDescriptor.replicas() / 2 +
1);
+ return aliveStableNodes.size() >
(zoneDescriptor.replicas() / 2 + 1);
+ });
+ });
});
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
new file mode 100644
index 00000000000..836d8ac0f4a
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/IllegalNodesException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.ILLEGAL_NODES_SET_ERR;
+
+/** Exception is thrown when zero or more than one node name is specified for
the operation. */
+public class IllegalNodesException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -7959192788912267028L;
+
+ /** Creates exception that nodes list is empty. */
+ public IllegalNodesException() {
+ super(ILLEGAL_NODES_SET_ERR, "Only one node name should be specified
for the operation.");
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
new file mode 100644
index 00000000000..1a1064529fa
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/NotEnoughAliveNodesException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.NOT_ENOUGH_ALIVE_NODES_ERR;
+
+/**
+ * Exception is thrown when there are not enough alive nodes to perform reset
with clean up.
+ */
+public class NotEnoughAliveNodesException extends DisasterRecoveryException {
+ private static final long serialVersionUID = -8864260391821773729L;
+
+ public NotEnoughAliveNodesException() {
+ super(NOT_ENOUGH_ALIVE_NODES_ERR, "Not enough alive nodes to perform
reset with clean up.");
+ }
+}