This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9d4b2852cb IGNITE-23571 resetPartitions improvements: do not try to
recover replica factor after not manual reset (#4686)
9d4b2852cb is described below
commit 9d4b2852cbcf6c2414f2b6eaaaa9b742a3575762
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Nov 8 13:00:22 2024 +0400
IGNITE-23571 resetPartitions improvements: do not try to recover replica
factor after not manual reset (#4686)
---
.../rest/recovery/DisasterRecoveryController.java | 3 +-
.../ItDisasterRecoveryReconfigurationTest.java | 243 +++++++++++++++++++--
.../disaster/DisasterRecoveryManager.java | 10 +-
.../DisasterRecoveryRequestSerializer.java | 6 +-
...pUpdateRequest.java => GroupUpdateRequest.java} | 33 ++-
...izer.java => GroupUpdateRequestSerializer.java} | 14 +-
.../DisasterRecoveryRequestSerializerTest.java | 19 +-
7 files changed, 277 insertions(+), 51 deletions(-)
diff --git
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
index d14c730190..fcec2e49a1 100644
---
a/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
+++
b/modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java
@@ -87,7 +87,8 @@ public class DisasterRecoveryController implements
DisasterRecoveryApi, Resource
return disasterRecoveryManager.resetPartitions(
command.zoneName(),
command.tableName(),
- command.partitionIds()
+ command.partitionIds(),
+ true
);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index dea685073b..405e8b2775 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,8 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
@@ -44,6 +46,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -67,6 +70,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.OperationType;
@@ -98,6 +102,7 @@ import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -234,7 +239,8 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of()
+ Set.of(),
+ true
);
assertThat(updateFuture, willSucceedIn(60, SECONDS));
@@ -280,7 +286,8 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of(anotherPartId)
+ Set.of(anotherPartId),
+ true
);
assertThat(updateFuture, willSucceedIn(60, SECONDS));
@@ -319,7 +326,8 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of()
+ Set.of(),
+ true
);
assertThat(updateFuture, willCompleteSuccessfully());
@@ -355,12 +363,6 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
int catalogVersion = node0.catalogManager().latestCatalogVersion();
long timestamp = node0.catalogManager().catalog(catalogVersion).time();
- Assignments assignment013 = Assignments.of(timestamp,
- Assignment.forPeer(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(3).name())
- );
-
Table table = node0.tables().table(TABLE_NAME);
awaitPrimaryReplica(node0, partId);
@@ -379,18 +381,16 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertRealAssignments(node0, partId, 0, 1, 3);
-
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
- BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg)
-> stableKeySwitchMessage(msg, partId, assignment013);
- BiPredicate<String, NetworkMessage> oldPredicate =
node.dropMessagesPredicate();
+ Assignments assignment013 = Assignments.of(timestamp,
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())
+ );
- if (oldPredicate == null) {
- node.dropMessages(newPredicate);
- } else {
- node.dropMessages(oldPredicate.or(newPredicate));
- }
- });
+ blockRebalanceStableSwitch(partId, assignment013);
- CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ CompletableFuture<Void> resetFuture =
+ node0.disasterRecoveryManager().resetPartitions(zoneName,
QUALIFIED_TABLE_NAME, emptySet(), true);
assertThat(resetFuture, willCompleteSuccessfully());
waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
@@ -407,7 +407,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
- resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet());
+ resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet(), true);
assertThat(resetFuture, willCompleteSuccessfully());
waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
@@ -434,6 +434,166 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
});
}
+ /**
+ * Tests that in a situation from the test {@link
#testInsertFailsIfMajorityIsLost()} it is possible to recover partition using a
+ * disaster recovery API, but with manual flag set to false. We expect
that in this replica factor won't be restored.
+ * In this test, assignments will be (1, 3, 4), according to {@link
RendezvousDistributionFunction}.
+ */
+ @Test
+ @ZoneParams(nodes = 5, replicas = 3, partitions = 1)
+ void testAutomaticRebalanceIfMajorityIsLost() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 1, 3, 4);
+
+ stopNodesInParallel(3, 4);
+
+ waitForScale(node0, 3);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ Set.of(),
+ false
+ );
+
+ assertThat(updateFuture, willSucceedIn(60, SECONDS));
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 1);
+
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ // Check that there is no ongoing or planned rebalance.
+ assertNull(getPendingAssignments(node0, partId));
+
+ assertRealAssignments(node0, partId, 1);
+ }
+
+ /**
+ * Tests a scenario where there's a single partition on a node 1, and the
node that hosts it is lost.
+ * Not manual reset should do nothing in that case, so no new pending or
planned is presented.
+ */
+ @Test
+ @ZoneParams(nodes = 2, replicas = 1, partitions = 1)
+ void testAutomaticRebalanceIfPartitionIsLost() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+
+ IgniteImpl node1 = unwrapIgniteImpl(cluster.node(1));
+
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 1);
+
+ stopNodesInParallel(1);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ Set.of(),
+ false
+ );
+
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ // Check that there is no ongoing or planned rebalance.
+ assertNull(getPendingAssignments(node0, partId));
+
+ assertEquals(1, getStableAssignments(node0, partId).nodes().size());
+
+ assertEquals(node1.name(), getStableAssignments(node0,
partId).nodes().stream().findFirst().get().consistentId());
+ }
+
+ /**
+ * Tests a scenario where only one node from stable is left, but we have
node in pending nodes and perform reset partition operation.
+ * We expect this node from pending being presented after reset, so not
manual reset logic take into account pending nodes.
+ *
+ * <p>It goes like this:
+ * <ul>
+ * <li>We have 6 nodes and a partition on nodes 1, 4 and 5.</li>
+ * <li>We stop node 5, so rebalance on 1, 3, 4 is triggered, but
blocked and cannot be finished.</li>
+ * <li>Zones scale down is set to infinite value, we stop node 4 and
new rebalance is not triggered and majority is lost.</li>
+ * <li>We execute "resetPartitions" and expect that pending
assignments will be 1, 3, so node 3 from pending is presented.</li>
+ * </ul>
+ */
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+ public void testIncompleteRebalanceBeforeAutomaticResetPartitions() throws
Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+ assertRealAssignments(node0, partId, 1, 4, 5);
+
+ insertValues(table, partId, 0);
+
+ Assignments assignment134 = Assignments.of(timestamp,
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(4).name())
+ );
+
+ blockRebalanceStableSwitch(partId, assignment134);
+
+ stopNode(5);
+
+ waitForScale(node0, 5);
+
+ assertRealAssignments(node0, partId, 1, 3, 4);
+
+ assertPendingAssignments(node0, partId, assignment134);
+
+ assertFalse(getPendingAssignments(node0, partId).force());
+
+ waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ stopNode(4);
+
+ CompletableFuture<Void> resetFuture =
+ node0.disasterRecoveryManager().resetPartitions(zoneName,
QUALIFIED_TABLE_NAME, emptySet(), false);
+ assertThat(resetFuture, willCompleteSuccessfully());
+
+ Assignments assignmentForced13 =
Assignments.forced(Set.of(Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())), timestamp);
+
+ assertPendingAssignments(node0, partId, assignmentForced13);
+ }
+
+ /**
+ * Block rebalance, so stable won't be switched to specified pending.
+ */
+ private void blockRebalanceStableSwitch(int partId, Assignments
assignment) {
+
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
+ BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg)
-> stableKeySwitchMessage(msg, partId, assignment);
+ BiPredicate<String, NetworkMessage> oldPredicate =
node.dropMessagesPredicate();
+
+ if (oldPredicate == null) {
+ node.dropMessages(newPredicate);
+ } else {
+ node.dropMessages(oldPredicate.or(newPredicate));
+ }
+ });
+ }
+
private boolean stableKeySwitchMessage(NetworkMessage msg, int partId,
Assignments blockedAssignments) {
if (msg instanceof WriteActionRequest) {
var writeActionRequest = (WriteActionRequest) msg;
@@ -476,7 +636,10 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
GlobalPartitionState state = map.values().iterator().next();
return state.state == expectedState;
- }, 500, 20_000));
+ }, 500, 20_000),
+ () -> "Expected state: " + expectedState
+ + ", actual: " +
node0.disasterRecoveryManager().globalPartitionStates(Set.of(zoneName),
emptySet()).join()
+ );
}
private void triggerRaftSnapshot(int nodeIdx, int partId) throws
InterruptedException, ExecutionException {
@@ -504,7 +667,17 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
}
private void assertRealAssignments(IgniteImpl node0, int partId,
Integer... expected) throws InterruptedException {
- assertTrue(waitForCondition(() ->
List.of(expected).equals(getRealAssignments(node0, partId)), 2000));
+ assertTrue(
+ waitForCondition(() ->
List.of(expected).equals(getRealAssignments(node0, partId)), 2000),
+ () -> "Expected: " + List.of(expected) + ", actual: " +
getRealAssignments(node0, partId)
+ );
+ }
+
+ private void assertPendingAssignments(IgniteImpl node0, int partId,
Assignments expected) throws InterruptedException {
+ assertTrue(
+ waitForCondition(() ->
expected.equals(getPendingAssignments(node0, partId)), 2000),
+ () -> "Expected: " + expected + ", actual: " +
getPendingAssignments(node0, partId)
+ );
}
/**
@@ -585,6 +758,10 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
}, 250, SECONDS.toMillis(60)));
}
+ /**
+ * Return assignments based on states of partitions in the cluster. It is
possible that returned value contains nodes
+ * from stable and pending, for example, when rebalance is in progress.
+ */
private List<Integer> getRealAssignments(IgniteImpl node0, int partId) {
CompletableFuture<Map<TablePartitionId, LocalPartitionStateByNode>>
partitionStatesFut = node0.disasterRecoveryManager()
.localPartitionStates(Set.of(zoneName), Set.of(), Set.of());
@@ -599,6 +776,28 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
.collect(Collectors.toList());
}
+ private @Nullable Assignments getPendingAssignments(IgniteImpl node0, int
partId) {
+ CompletableFuture<Entry> pendingFut = node0.metaStorageManager()
+ .get(pendingPartAssignmentsKey(new TablePartitionId(tableId,
partId)));
+
+ assertThat(pendingFut, willCompleteSuccessfully());
+
+ Entry pending = pendingFut.join();
+
+ return pending.empty() ? null : Assignments.fromBytes(pending.value());
+ }
+
+ private @Nullable Assignments getStableAssignments(IgniteImpl node0, int
partId) {
+ CompletableFuture<Entry> stableFut = node0.metaStorageManager()
+ .get(stablePartAssignmentsKey(new TablePartitionId(tableId,
partId)));
+
+ assertThat(stableFut, willCompleteSuccessfully());
+
+ Entry stable = stableFut.join();
+
+ return stable.empty() ? null : Assignments.fromBytes(stable.value());
+ }
+
@Retention(RetentionPolicy.RUNTIME)
@interface ZoneParams {
int replicas();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index a30aedf306..b411af60c0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -102,7 +102,8 @@ import org.jetbrains.annotations.Nullable;
/**
* Manager, responsible for "disaster recovery" operations.
* Internally it triggers meta-storage updates, in order to acquire unique
causality token.
- * As a reaction to these updates, manager performs actual recovery
operations, such as {@link #resetPartitions(String, String, Set)}.
+ * As a reaction to these updates, manager performs actual recovery operations,
+ * such as {@link #resetPartitions(String, String, Set, boolean)}.
* More details are in the <a
href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>.
*/
public class DisasterRecoveryManager implements IgniteComponent,
SystemViewProvider {
@@ -247,9 +248,10 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
* @param zoneName Name of the distribution zone. Case-sensitive, without
quotes.
* @param tableName Fully-qualified table name. Case-sensitive, without
quotes. Example: "PUBLIC.Foo".
* @param partitionIds IDs of partitions to reset. If empty, reset all
zone's partitions.
+ * @param manualUpdate Whether the update is triggered manually by user or
automatically by core logic.
* @return Future that completes when partitions are reset.
*/
- public CompletableFuture<Void> resetPartitions(String zoneName, String
tableName, Set<Integer> partitionIds) {
+ public CompletableFuture<Void> resetPartitions(String zoneName, String
tableName, Set<Integer> partitionIds, boolean manualUpdate) {
try {
Catalog catalog = catalogLatestVersion();
@@ -259,7 +261,9 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
checkPartitionsRange(partitionIds, Set.of(zone));
- return processNewRequest(new
ManualGroupUpdateRequest(UUID.randomUUID(), catalog.version(), zone.id(),
tableId, partitionIds));
+ return processNewRequest(
+ new GroupUpdateRequest(UUID.randomUUID(),
catalog.version(), zone.id(), tableId, partitionIds, manualUpdate)
+ );
} catch (Throwable t) {
return failedFuture(t);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
index 4a7c96c74e..7f407af488 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializer.java
@@ -50,7 +50,7 @@ class DisasterRecoveryRequestSerializer extends
VersionedSerializer<DisasterReco
}
private enum Operation {
- MANUAL_GROUP_UPDATE(0, ManualGroupUpdateRequestSerializer.INSTANCE),
+ GROUP_UPDATE(0, GroupUpdateRequestSerializer.INSTANCE),
MANUAL_GROUP_RESTART(1, ManualGroupRestartRequestSerializer.INSTANCE);
private static final Map<Integer, Operation> valuesByCode =
Arrays.stream(values())
@@ -75,8 +75,8 @@ class DisasterRecoveryRequestSerializer extends
VersionedSerializer<DisasterReco
}
static Operation findByRequest(DisasterRecoveryRequest request) {
- if (request instanceof ManualGroupUpdateRequest) {
- return MANUAL_GROUP_UPDATE;
+ if (request instanceof GroupUpdateRequest) {
+ return GROUP_UPDATE;
}
if (request instanceof ManualGroupRestartRequest) {
return MANUAL_GROUP_RESTART;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
similarity index 94%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index fb05b50d16..2abaffaf99 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -73,7 +73,7 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
-class ManualGroupUpdateRequest implements DisasterRecoveryRequest {
+class GroupUpdateRequest implements DisasterRecoveryRequest {
private final UUID operationId;
/**
@@ -87,12 +87,15 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
private final Set<Integer> partitionIds;
- ManualGroupUpdateRequest(UUID operationId, int catalogVersion, int zoneId,
int tableId, Set<Integer> partitionIds) {
+ private final boolean manualUpdate;
+
+ GroupUpdateRequest(UUID operationId, int catalogVersion, int zoneId, int
tableId, Set<Integer> partitionIds, boolean manualUpdate) {
this.operationId = operationId;
this.catalogVersion = catalogVersion;
this.zoneId = zoneId;
this.tableId = tableId;
this.partitionIds = Set.copyOf(partitionIds);
+ this.manualUpdate = manualUpdate;
}
@Override
@@ -122,6 +125,10 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
return partitionIds;
}
+ public boolean manualUpdate() {
+ return manualUpdate;
+ }
+
@Override
public CompletableFuture<Void> handle(DisasterRecoveryManager
disasterRecoveryManager, long msRevision, HybridTimestamp msTimestamp) {
int catalogVersion =
disasterRecoveryManager.catalogManager.activeCatalogVersion(msTimestamp.longValue());
@@ -206,7 +213,7 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
futures[partitionId] =
tableAssignmentsFut.thenCompose(tableAssignments ->
- tableAssignments.isEmpty() ? nullCompletedFuture() :
manualPartitionUpdate(
+ tableAssignments.isEmpty() ? nullCompletedFuture() :
partitionUpdate(
replicaGrpId,
aliveDataNodes,
aliveNodesConsistentIds,
@@ -215,7 +222,8 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
metaStorageManager,
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
localStatesMap.get(replicaGrpId),
- assignmentsTimestamp
+ assignmentsTimestamp,
+ this.manualUpdate
)).thenAccept(res -> {
DisasterRecoveryManager.LOG.info(
"Partition {} returned {} status on reset
attempt", replicaGrpId, UpdateStatus.valueOf(res)
@@ -227,7 +235,7 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
return futures;
}
- private static CompletableFuture<Integer> manualPartitionUpdate(
+ private static CompletableFuture<Integer> partitionUpdate(
TablePartitionId partId,
Collection<String> aliveDataNodes,
Set<String> aliveNodesConsistentIds,
@@ -236,7 +244,8 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
MetaStorageManager metaStorageMgr,
Set<Assignment> currentAssignments,
LocalPartitionStateMessageByNode localPartitionStateMessageByNode,
- long assignmentsTimestamp
+ long assignmentsTimestamp,
+ boolean manualUpdate
) {
Set<Assignment> partAssignments =
getAliveNodesWithData(aliveNodesConsistentIds,
localPartitionStateMessageByNode);
Set<Assignment> aliveStableNodes =
CollectionUtils.intersect(currentAssignments, partAssignments);
@@ -248,6 +257,10 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
Iif invokeClosure;
if (aliveStableNodes.isEmpty()) {
+ if (!manualUpdate) {
+ return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+ }
+
enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
// There are no known nodes with data, which means that we can
just put new assignments into pending assignments with "forced"
@@ -260,10 +273,14 @@ class ManualGroupUpdateRequest implements
DisasterRecoveryRequest {
);
} else {
Set<Assignment> stableAssignments = Set.copyOf(partAssignments);
- enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
+
+ if (manualUpdate) {
+ enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
+ }
// There are nodes with data, and we set pending assignments to
this set of nodes. It'll be the source of peers for
- // "resetPeers", and after that new assignments with restored
replica factor wil be picked up from planned assignments.
+ // "resetPeers", and after that new assignments with restored
replica factor wil be picked up from planned assignments
+ // for the case of the manual update, that was triggered by a user.
invokeClosure = prepareMsInvokeClosure(
partId,
longToBytesKeepingOrder(revision),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
similarity index 73%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
index ee34659b09..50df274644 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/ManualGroupUpdateRequestSerializer.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequestSerializer.java
@@ -28,29 +28,31 @@ import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.versioned.VersionedSerializer;
/**
- * {@link VersionedSerializer} for {@link ManualGroupUpdateRequest} instances.
+ * {@link VersionedSerializer} for {@link GroupUpdateRequest} instances.
*/
-class ManualGroupUpdateRequestSerializer extends
VersionedSerializer<ManualGroupUpdateRequest> {
+class GroupUpdateRequestSerializer extends
VersionedSerializer<GroupUpdateRequest> {
/** Serializer instance. */
- static final ManualGroupUpdateRequestSerializer INSTANCE = new
ManualGroupUpdateRequestSerializer();
+ static final GroupUpdateRequestSerializer INSTANCE = new
GroupUpdateRequestSerializer();
@Override
- protected void writeExternalData(ManualGroupUpdateRequest request,
IgniteDataOutput out) throws IOException {
+ protected void writeExternalData(GroupUpdateRequest request,
IgniteDataOutput out) throws IOException {
out.writeUuid(request.operationId());
out.writeVarInt(request.catalogVersion());
out.writeVarInt(request.zoneId());
out.writeVarInt(request.tableId());
writeVarIntSet(request.partitionIds(), out);
+ out.writeBoolean(request.manualUpdate());
}
@Override
- protected ManualGroupUpdateRequest readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
+ protected GroupUpdateRequest readExternalData(byte protoVer,
IgniteDataInput in) throws IOException {
UUID operationId = in.readUuid();
int catalogVersion = in.readVarIntAsInt();
int zoneId = in.readVarIntAsInt();
int tableId = in.readVarIntAsInt();
Set<Integer> partitionIds = readVarIntSet(in);
+ boolean manualUpdate = in.readBoolean();
- return new ManualGroupUpdateRequest(operationId, catalogVersion,
zoneId, tableId, partitionIds);
+ return new GroupUpdateRequest(operationId, catalogVersion, zoneId,
tableId, partitionIds, manualUpdate);
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
index 39c95aee15..a9d1ec0217 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequestSerializerTest.java
@@ -28,41 +28,44 @@ import
org.apache.ignite.internal.versioned.VersionedSerialization;
import org.junit.jupiter.api.Test;
class DisasterRecoveryRequestSerializerTest {
- private static final String MANUAL_GROUP_UPDATE_REQUEST_V1_BASE64 =
"Ae++QwEB775D782rkHhWNBIhQ2WHCbrc/ukH0Q+5FwQWDCA=";
+ private static final String GROUP_UPDATE_REQUEST_V1_BASE64 =
"Ae++QwEB775D782rkHhWNBIhQ2WHCbrc/ukH0Q+5FwQMIBYB";
private static final String MANUAL_GROUP_RESTART_REQUEST_V1_BASE64 =
"Ae++QwIB775D782rkHhWNBIhQ2WHCbrc/tEPuRcEIBYMAwJiAmH///9///+AgAQ=";
private final DisasterRecoveryRequestSerializer serializer = new
DisasterRecoveryRequestSerializer();
@Test
- void serializationAndDeserializationOfManualGroupUpdateRequest() {
- var originalRequest = new ManualGroupUpdateRequest(
+ void serializationAndDeserializationOfGroupUpdateRequest() {
+ var originalRequest = new GroupUpdateRequest(
new UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L),
1000,
2000,
3000,
- Set.of(11, 21, 31)
+ Set.of(11, 21, 31),
+ true
);
byte[] bytes = VersionedSerialization.toBytes(originalRequest,
serializer);
- ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
+ GroupUpdateRequest restoredRequest = (GroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
assertThat(restoredRequest.catalogVersion(), is(1000));
assertThat(restoredRequest.zoneId(), is(2000));
assertThat(restoredRequest.tableId(), is(3000));
assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ assertThat(restoredRequest.manualUpdate(), is(true));
}
@Test
- void v1OfManualGroupUpdateRequestCanBeDeserialized() {
- byte[] bytes =
Base64.getDecoder().decode(MANUAL_GROUP_UPDATE_REQUEST_V1_BASE64);
- ManualGroupUpdateRequest restoredRequest = (ManualGroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
+ void v1OfGroupUpdateRequestCanBeDeserialized() {
+ byte[] bytes =
Base64.getDecoder().decode(GROUP_UPDATE_REQUEST_V1_BASE64);
+ GroupUpdateRequest restoredRequest = (GroupUpdateRequest)
VersionedSerialization.fromBytes(bytes, serializer);
assertThat(restoredRequest.operationId(), is(new
UUID(0x1234567890ABCDEFL, 0xFEDCBA0987654321L)));
assertThat(restoredRequest.catalogVersion(), is(1000));
assertThat(restoredRequest.zoneId(), is(2000));
assertThat(restoredRequest.tableId(), is(3000));
assertThat(restoredRequest.partitionIds(), is(Set.of(11, 21, 31)));
+ assertThat(restoredRequest.manualUpdate(), is(true));
}
@Test