This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6b8b43aa4 IGNITE-23559 resetPartitions improvements: two phase reset
(#4688)
d6b8b43aa4 is described below
commit d6b8b43aa484654f3de9c4e3dd2f6e216d18e2b9
Author: Cyrill <[email protected]>
AuthorDate: Thu Nov 21 19:50:45 2024 +0300
IGNITE-23559 resetPartitions improvements: two phase reset (#4688)
---
.../rebalance/AssignmentUtil.java | 95 +++++
.../rebalance/DistributionZoneRebalanceEngine.java | 42 +-
.../distributionzones/rebalance/RebalanceUtil.java | 163 ++++---
.../ignite/internal/table/ItDurableFinishTest.java | 11 -
.../java/org/apache/ignite/internal/Cluster.java | 7 +
.../internal/ClusterPerTestIntegrationTest.java | 9 +
.../ItDisasterRecoveryReconfigurationTest.java | 468 +++++++++++++++++++--
.../disaster/DisasterRecoveryManager.java | 16 +
.../distributed/disaster/GroupUpdateRequest.java | 196 +++++----
.../disaster/LocalPartitionStateMessageByNode.java | 9 +
10 files changed, 793 insertions(+), 223 deletions(-)
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
new file mode 100644
index 0000000000..9bac627ecb
--- /dev/null
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.rebalance;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+
+/**
+ * Util class for methods to work with the assignments.
+ */
+public class AssignmentUtil {
+ /**
+ * Collect partition ids for partition assignments.
+ *
+ * @param partitionIds IDs of partitions to get assignments for. If empty,
get all partition assignments.
+ * @param numberOfPartitions Number of partitions. Ignored if partition
IDs are specified.
+ */
+ public static int[] partitionIds(Set<Integer> partitionIds, int
numberOfPartitions) {
+ IntStream partitionIdsStream = partitionIds.isEmpty()
+ ? IntStream.range(0, numberOfPartitions)
+ : partitionIds.stream().mapToInt(Integer::intValue);
+
+ return partitionIdsStream.toArray();
+ }
+
+ /**
+ * Collect partition ids for partition assignments.
+ *
+ * @param numberOfPartitions Number of partitions.
+ */
+ public static int[] partitionIds(int numberOfPartitions) {
+ return partitionIds(Set.of(), numberOfPartitions);
+ }
+
+ /**
+ * Returns assignments for table partitions from meta storage.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param partitionIds IDs of partitions to get assignments for.
+ * @return Future with table assignments as a value.
+ */
+ public static CompletableFuture<Map<Integer, Assignments>>
metastoreAssignments(
+ MetaStorageManager metaStorageManager,
+ int[] partitionIds,
+ Function<Integer, ByteArray> keyForPartition
+ ) {
+ Map<ByteArray, Integer> partitionKeysToPartitionNumber = new
HashMap<>();
+
+ for (int partitionId : partitionIds) {
+
partitionKeysToPartitionNumber.put(keyForPartition.apply(partitionId),
partitionId);
+ }
+
+ return
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
+ .thenApply(entries -> {
+ if (entries.isEmpty()) {
+ return Map.of();
+ }
+
+ Map<Integer, Assignments> result = new HashMap<>();
+
+ for (var mapEntry : entries.entrySet()) {
+ Entry entry = mapEntry.getValue();
+
+ if (!entry.empty() && !entry.tombstone()) {
+
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()),
Assignments.fromBytes(entry.value()));
+ }
+ }
+
+ return result.isEmpty() ? Map.of() : result;
+ });
+ }
+}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index c92f85e2b0..ae252da62d 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -33,7 +33,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.Catalog;
@@ -51,7 +50,6 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.Revisions;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.TestOnly;
@@ -312,53 +310,17 @@ public class DistributionZoneRebalanceEngine {
List<CompletableFuture<?>> tableFutures = new
ArrayList<>(tableDescriptors.size());
for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
- CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerAllTablePartitionsRebalance(
+ tableFutures.add(RebalanceUtil.triggerAllTablePartitionsRebalance(
tableDescriptor,
zoneDescriptor,
dataNodes,
revision,
metaStorageManager,
assignmentsTimestamp
- );
-
- // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
- // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
- // in the logs.
- Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
-
- for (int partId = 0; partId < partitionFutures.length; partId++) {
- int finalPartId = partId;
-
- partitionFutures[partId].exceptionally(e -> {
- Throwable cause = ExceptionUtils.unwrapCause(e);
-
- if (unwrappedCauses.add(cause)) {
- // The exception is specific to this partition.
- LOG.error(
- "Exception on updating assignments for
[table={}, partition={}]",
- e,
- tableInfo(tableDescriptor), finalPartId
- );
- } else {
- // The exception is from upstream and not specific for
this partition, so don't log the partition index.
- LOG.error(
- "Exception on updating assignments for
[table={}]",
- e,
- tableInfo(tableDescriptor)
- );
- }
-
- return null;
- });
- }
-
- tableFutures.add(allOf(partitionFutures));
+ ));
}
return allOf(tableFutures.toArray(CompletableFuture[]::new));
}
- private static String tableInfo(CatalogTableDescriptor tableDescriptor) {
- return tableDescriptor.id() + "/" + tableDescriptor.name();
- }
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index c859d8a411..a03f2e3b71 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.distributionzones.rebalance;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil.metastoreAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil.partitionIds;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -40,12 +43,12 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.nio.charset.StandardCharsets;
import java.util.Collection;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -60,6 +63,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -276,7 +280,7 @@ public class RebalanceUtil {
* @return Array of futures, one per partition of the table; the futures
complete when the described
* rebalance triggering completes.
*/
- public static CompletableFuture<?>[] triggerAllTablePartitionsRebalance(
+ public static CompletableFuture<Void> triggerAllTablePartitionsRebalance(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
Set<String> dataNodes,
@@ -284,39 +288,93 @@ public class RebalanceUtil {
MetaStorageManager metaStorageManager,
long assignmentsTimestamp
) {
- CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut =
tableAssignments(
- metaStorageManager,
- tableDescriptor.id(),
- Set.of(),
- zoneDescriptor.partitions()
- );
+ int[] partitionIds = partitionIds(zoneDescriptor.partitions());
+
+ return tableStableAssignments(metaStorageManager,
tableDescriptor.id(), partitionIds)
+ .thenCompose(stableAssignments -> {
+ if (stableAssignments.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ return tablePartitionAssignment(
+ tableDescriptor,
+ zoneDescriptor,
+ dataNodes,
+ storageRevision,
+ metaStorageManager,
+ assignmentsTimestamp,
+ stableAssignments
+ );
+ });
+ }
+ private static CompletableFuture<Void> tablePartitionAssignment(
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor,
+ Set<String> dataNodes,
+ long storageRevision,
+ MetaStorageManager metaStorageManager,
+ long assignmentsTimestamp,
+ Map<Integer, Assignments> tableAssignments
+ ) {
+ // tableAssignments should not be empty. It is checked for emptiness
before calling this method.
CompletableFuture<?>[] futures = new
CompletableFuture[zoneDescriptor.partitions()];
for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partId);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19763 We
should distinguish empty stable assignments on
+ // TODO node recovery in case of interrupted table creation, and
moving from empty assignments to non-empty.
+ futures[partId] = updatePendingAssignmentsKeys(
+ tableDescriptor,
+ replicaGrpId,
+ dataNodes,
+ zoneDescriptor.replicas(),
+ storageRevision,
+ metaStorageManager,
+ partId,
+ tableAssignments.get(partId).nodes(),
+ assignmentsTimestamp
+ );
+ }
+
+ // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
+ // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
+ // in the logs.
+ Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+ for (int partId = 0; partId < futures.length; partId++) {
int finalPartId = partId;
- futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments
->
- // TODO https://issues.apache.org/jira/browse/IGNITE-19763
We should distinguish empty stable assignments on
- // TODO node recovery in case of interrupted table
creation, and moving from empty assignments to non-empty.
- tableAssignments.isEmpty() ? nullCompletedFuture() :
updatePendingAssignmentsKeys(
- tableDescriptor,
- replicaGrpId,
- dataNodes,
- zoneDescriptor.replicas(),
- storageRevision,
- metaStorageManager,
- finalPartId,
- tableAssignments.get(finalPartId).nodes(),
- assignmentsTimestamp
- ));
+ futures[partId].exceptionally(e -> {
+ Throwable cause = ExceptionUtils.unwrapCause(e);
+
+ if (unwrappedCauses.add(cause)) {
+ // The exception is specific to this partition.
+ LOG.error(
+ "Exception on updating assignments for
[tableId={}, name={}, partition={}]",
+ e,
+ tableDescriptor.id(), tableDescriptor.name(),
finalPartId
+ );
+ } else {
+ // The exception is from upstream and not specific for
this partition, so don't log the partition index.
+ LOG.error(
+ "Exception on updating assignments for
[tableId={}, name={}]",
+ e,
+ tableDescriptor.id(), tableDescriptor.name()
+ );
+ }
+
+ return null;
+ });
}
- return futures;
+ return allOf(futures);
}
+ /** Key prefix for planned assignments. */
+ public static final String PLANNED_ASSIGNMENTS_PREFIX =
"assignments.planned.";
+
/** Key prefix for pending assignments. */
public static final String PENDING_ASSIGNMENTS_PREFIX =
"assignments.pending.";
@@ -359,7 +417,7 @@ public class RebalanceUtil {
* @see <a
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalance
documentation</a>
*/
public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId)
{
- return new ByteArray("assignments.planned." + partId);
+ return new ByteArray(PLANNED_ASSIGNMENTS_PREFIX + partId);
}
/**
@@ -521,54 +579,31 @@ public class RebalanceUtil {
}
/**
- * Returns table assignments for table partitions from meta storage.
+ * Returns stable table assignments for table partitions from meta storage.
*
* @param metaStorageManager Meta storage manager.
* @param tableId Table id.
- * @param partitionIds IDs of partitions to get assignments for. If empty,
get all partition assignments.
- * @param numberOfPartitions Number of partitions. Ignored if partition
IDs are specified.
+ * @param partitionIds IDs of partitions to get assignments for.
* @return Future with table assignments as a value.
*/
- public static CompletableFuture<Map<Integer, Assignments>>
tableAssignments(
+ public static CompletableFuture<Map<Integer, Assignments>>
tableStableAssignments(
MetaStorageManager metaStorageManager,
int tableId,
- Set<Integer> partitionIds,
- int numberOfPartitions
+ int[] partitionIds
) {
- IntStream partitionIdsStream = partitionIds.isEmpty()
- ? IntStream.range(0, numberOfPartitions)
- : partitionIds.stream().mapToInt(Integer::intValue);
-
- Map<ByteArray, Integer> partitionKeysToPartitionNumber =
partitionIdsStream.collect(
- HashMap::new,
- (map, partId) -> map.put(stablePartAssignmentsKey(new
TablePartitionId(tableId, partId)), partId),
- Map::putAll
- );
-
- return
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
- .thenApply(entries -> {
- if (entries.isEmpty()) {
- return Map.of();
- }
-
- Map<Integer, Assignments> result = new HashMap<>();
- int numberOfMsPartitions = 0;
-
- for (var mapEntry : entries.entrySet()) {
- Entry entry = mapEntry.getValue();
-
- if (!entry.empty() && !entry.tombstone()) {
-
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()),
Assignments.fromBytes(entry.value()));
- numberOfMsPartitions++;
- }
- }
-
- assert numberOfMsPartitions == 0 || numberOfMsPartitions
== entries.size()
- : "Invalid number of stable partition entries
received from meta storage [received="
- + numberOfMsPartitions + ", numberOfPartitions=" +
entries.size() + ", tableId=" + tableId + "].";
-
- return numberOfMsPartitions == 0 ? Map.of() : result;
- });
+ return metastoreAssignments(
+ metaStorageManager,
+ partitionIds,
+ partitionId -> stablePartAssignmentsKey(new
TablePartitionId(tableId, partitionId))
+ ).whenComplete((assignmentsMap, throwable) -> {
+ if (throwable == null) {
+ int numberOfMsPartitions = assignmentsMap.size();
+
+ assert numberOfMsPartitions == 0 || numberOfMsPartitions ==
partitionIds.length
+ : "Invalid number of partition entries received from
meta storage [received="
+ + numberOfMsPartitions + ", numberOfPartitions=" +
partitionIds.length + ", tableId=" + tableId + "].";
+ }
+ });
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 0b7fc2c4a0..1c63cd7bfb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -66,7 +66,6 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.tx.TransactionException;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/**
@@ -383,16 +382,6 @@ public class ItDurableFinishTest extends
ClusterPerTestIntegrationTest {
);
}
- private @Nullable Integer nodeIndex(String name) {
- for (int i = 0; i < initialNodes(); i++) {
- if (node(i).name().equals(name)) {
- return i;
- }
- }
-
- return null;
- }
-
private static class Context {
private final IgniteImpl primaryNode;
private final IgniteImpl coordinatorNode;
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 37cd0dc4fb..a47ce774cd 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -357,6 +357,13 @@ public class Cluster {
return Objects.requireNonNull(nodes.get(index));
}
+ /**
+ * Returns an Ignite node (a member of the cluster) by its index.
+ */
+ public @Nullable Ignite nullableNode(int index) {
+ return nodes.get(index);
+ }
+
/**
* Returns a node that is not stopped and not knocked out (so it can be
used to interact with the cluster).
*/
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 8965cdf32a..5570cf775e 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
@@ -220,6 +221,14 @@ public abstract class ClusterPerTestIntegrationTest
extends BaseIgniteAbstractTe
return cluster.node(index);
}
+ public @Nullable Ignite nullableNode(int index) {
+ return cluster.nullableNode(index);
+ }
+
+ protected int nodeIndex(String name) {
+ return cluster.nodeIndex(name);
+ }
+
protected final IgniteImpl igniteImpl(int index) {
return unwrapIgniteImpl(node(index));
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 405e8b2775..caa19b6702 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -29,6 +29,7 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
@@ -54,6 +55,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -97,7 +100,9 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -151,7 +156,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
ZoneParams zoneParams = testMethod.getAnnotation(ZoneParams.class);
- startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
+ IntStream.range(INITIAL_NODES, zoneParams.nodes()).forEach(i ->
cluster.startNode(i));
+ // TODO: IGNITE-22818 Fails with "Race operations took too long"
+ // startNodesInParallel(IntStream.range(INITIAL_NODES,
zoneParams.nodes()).toArray());
executeSql(format("CREATE ZONE %s with replicas=%d, partitions=%d,"
+ " data_nodes_auto_adjust_scale_down=%d,
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s'",
@@ -236,10 +243,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
waitForScale(node0, 3);
- CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of(),
true
);
@@ -323,10 +329,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
waitForScale(node0, 1);
- CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of(),
true
);
@@ -374,7 +379,11 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
// Second snapshot causes log truncation.
triggerRaftSnapshot(1, partId);
- unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) ->
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+ unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) -> {
+ Ignite node = nullableNode(3);
+
+ return node != null && node.name().equals(nodeName) && msg
instanceof SnapshotMvDataResponse;
+ });
stopNodesInParallel(4, 5);
waitForScale(node0, 4);
@@ -389,28 +398,29 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
blockRebalanceStableSwitch(partId, assignment013);
- CompletableFuture<Void> resetFuture =
- node0.disasterRecoveryManager().resetPartitions(zoneName,
QUALIFIED_TABLE_NAME, emptySet(), true);
+ CompletableFuture<Void> resetFuture =
node0.disasterRecoveryManager().resetAllPartitions(zoneName,
QUALIFIED_TABLE_NAME, true);
assertThat(resetFuture, willCompleteSuccessfully());
- waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.DEGRADED);
var localStatesFut =
node0.disasterRecoveryManager().localPartitionStates(emptySet(),
Set.of(node(3).name()), emptySet());
assertThat(localStatesFut, willCompleteSuccessfully());
Map<TablePartitionId, LocalPartitionStateByNode> localStates =
localStatesFut.join();
assertThat(localStates, is(not(anEmptyMap())));
- assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT,
localStates.values().iterator().next().values().iterator().next().state);
+ LocalPartitionStateByNode localPartitionStateByNode =
localStates.get(new TablePartitionId(tableId, partId));
+
+ assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT,
localPartitionStateByNode.values().iterator().next().state);
stopNode(1);
waitForScale(node0, 3);
- waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.DEGRADED);
- resetFuture =
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME,
emptySet(), true);
+ resetFuture =
node0.disasterRecoveryManager().resetAllPartitions(zoneName,
QUALIFIED_TABLE_NAME, true);
assertThat(resetFuture, willCompleteSuccessfully());
- waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.AVAILABLE);
awaitPrimaryReplica(node0, partId);
assertRealAssignments(node0, partId, 0, 2, 3);
@@ -455,10 +465,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
waitForScale(node0, 3);
- CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of(),
false
);
@@ -498,10 +507,9 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
stopNodesInParallel(1);
- CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetPartitions(
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
zoneName,
QUALIFIED_TABLE_NAME,
- Set.of(),
false
);
@@ -562,38 +570,394 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertFalse(getPendingAssignments(node0, partId).force());
- waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.AVAILABLE);
executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
stopNode(4);
CompletableFuture<Void> resetFuture =
- node0.disasterRecoveryManager().resetPartitions(zoneName,
QUALIFIED_TABLE_NAME, emptySet(), false);
+ node0.disasterRecoveryManager().resetAllPartitions(zoneName,
QUALIFIED_TABLE_NAME, false);
assertThat(resetFuture, willCompleteSuccessfully());
- Assignments assignmentForced13 =
Assignments.forced(Set.of(Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(3).name())), timestamp);
+ Assignments assignmentForced1 =
Assignments.forced(Set.of(Assignment.forPeer(node(1).name())), timestamp);
+
+ assertPendingAssignments(node0, partId, assignmentForced1);
+
+ Assignments assignments13 = Assignments.of(Set.of(
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(3).name())
+ ), timestamp);
- assertPendingAssignments(node0, partId, assignmentForced13);
+ assertPlannedAssignments(node0, partId, assignments13);
}
/**
- * Block rebalance, so stable won't be switched to specified pending.
+ * The test creates a group of 7 nodes, and performs writes in three steps:
+ * <ol>
+ * <li>Write data to all nodes.</li>
+ * <li>Block raft updates on one of the nodes. Write second portion of
data to the group.</li>
+ * <li>Additionally block updates on another node. Write third portion of
data to the group.</li>
+ * </ol>
+ *
+ * <p>As a result, we'll have one node having only data(1), one node
having data(2) and the other 5 nodes having
+ * data(3).
+ * Then we stop 4 out of 5 nodes with data(3) - effectively the majority
of 7 nodes, and call resetPartitions.
+ * The two phase reset should pick the node with the highest raft log
index as the single node for the pending assignments.
*/
- private void blockRebalanceStableSwitch(int partId, Assignments
assignment) {
+ @Test
+ @ZoneParams(nodes = 7, replicas = 7, partitions = 1)
+ void testThoPhaseResetMaxLogIndex() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ Set<String> clusterNodeNames = Set.of(
+ node(0).name(),
+ node(1).name(),
+ node(2).name(),
+ node(3).name(),
+ node(4).name(),
+ node(5).name(),
+ node(6).name());
+ assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+ Assignments allAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name()),
+ Assignment.forPeer(node(6).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, allAssignments);
+
+ // Write data(1) to all seven nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
+
+ // We filter out the leader to be able to reliably block raft state
transfer.
+ String leaderName = findLeader(1, partId);
+ logger().info("Raft group leader is [id={}]", leaderName);
+
+ // All the nodes except the leader - 6 nodes.
+ List<String> followerNodes = new ArrayList<>(clusterNodeNames);
+ followerNodes.remove(leaderName);
+
+ // The nodes that we block AppendEntriesRequest to.
+ Set<String> blockedNodes = new ConcurrentHashSet<>();
+
+ // Exclude one of the nodes from data(2).
+ int node0IndexInFollowers = followerNodes.indexOf(node0.name());
+ // Make sure node 0 is not stopped. If node0IndexInFollowers==-1, then
node0 is the leader.
+ blockedNodes.add(followerNodes.remove(node0IndexInFollowers == -1 ? 0
: node0IndexInFollowers));
+ logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+ blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg,
tablePartitionId, blockedNodes));
+
+ // Write data(2) to 6 nodes.
+ errors = insertValues(table, partId, 10);
+ assertThat(errors, is(empty()));
+
+ // Exclude one more node from data replication.
+ blockedNodes.add(followerNodes.remove(0));
+ logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+ // Write data(3) to 5 nodes.
+ errors = insertValues(table, partId, 20);
+ assertThat(errors, is(empty()));
+
+ // Disable scale down.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ // Now followerNodes has 4 elements - without the leader and two
blocked nodes. Stop them all.
+ int[] nodesToStop = followerNodes.stream()
+ .mapToInt(this::nodeIndex)
+ .toArray();
+ logger().info("Stopping nodes [id={}]", Arrays.toString(nodesToStop));
+ stopNodesInParallel(nodesToStop);
+
+ // One of them has the most up to date data, the others fall behind.
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.READ_ONLY);
+
+ // Collect nodes that will be the part of the planned assignments.
+ // These are the leader and two blocked nodes.
+ List<String> nodesNamesForFinalAssignments = new
ArrayList<>(blockedNodes);
+ nodesNamesForFinalAssignments.add(leaderName);
+
+ // Unblock raft.
+ blockedNodes.clear();
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true
+ );
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ // Pending is the one with the most up to date log index.
+ Assignments assignmentPending =
Assignments.forced(Set.of(Assignment.forPeer(leaderName)), timestamp);
+
+ assertPendingAssignments(node0, partId, assignmentPending);
+
+ Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
+ .map(Assignment::forPeer)
+ .collect(Collectors.toSet());
+
+ Assignments assignmentsPlanned = Assignments.of(peers, timestamp);
+
+ assertPlannedAssignments(node0, partId, assignmentsPlanned);
+
+ // Wait for the new stable assignments to take effect.
+ executeSql(format("ALTER ZONE %s SET replicas=%d", zoneName, 3));
+
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.AVAILABLE);
+
+ // Make sure the data is present.
+ IntStream.range(0, ENTRIES).forEach(i -> {
+ CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null,
Tuple.create(of("id", i)));
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertEquals(Tuple.create(of("val", i + 20)), fut.join());
+ });
+ }
+
+ /**
+ * The test creates a group of 7 nodes, and performs writes in two steps:
+ * <ol>
+ * <li>Write data to all nodes.</li>
+ * <li>Block raft updates on one of the nodes. Write second portion of
data to the group.</li>
+ * </ol>
+ *
+ * <p>As a result, we'll have one node having only data(1) and the other 6
nodes having
+ * data(2).
+ * Then we stop 4 nodes with data(2) - effectively the majority of 7
nodes, and call resetPartitions.
+ * The two phase reset should pick the node with the highest raft log
index as the single node for the pending assignments.
+ */
+ @Test
+ @ZoneParams(nodes = 7, replicas = 7, partitions = 1)
+ void testThoPhaseResetEqualLogIndex() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+ Table table = node0.tables().table(TABLE_NAME);
+
+ awaitPrimaryReplica(node0, partId);
+
+ Set<String> clusterNodeNames = Set.of(
+ node(0).name(),
+ node(1).name(),
+ node(2).name(),
+ node(3).name(),
+ node(4).name(),
+ node(5).name(),
+ node(6).name());
+ assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+ Assignments allAssignments = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(2).name()),
+ Assignment.forPeer(node(3).name()),
+ Assignment.forPeer(node(4).name()),
+ Assignment.forPeer(node(5).name()),
+ Assignment.forPeer(node(6).name())
+ ), timestamp);
+
+ assertStableAssignments(node0, partId, allAssignments);
+ // Write data(1) to all seven nodes.
+ List<Throwable> errors = insertValues(table, partId, 0);
+ assertThat(errors, is(empty()));
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
+
+ // We filter out the leader to be able to reliably block raft state
transfer on the other nodes.
+ String leaderName = findLeader(1, partId);
+ logger().info("Raft group leader is [id={}]", leaderName);
+
+ // All the nodes except the leader - 6 nodes.
+ List<String> followerNodes = new ArrayList<>(clusterNodeNames);
+ followerNodes.remove(leaderName);
+
+ // The nodes that we block AppendEntriesRequest to.
+ Set<String> blockedNodes = new ConcurrentHashSet<>();
+
+ // Exclude one of the nodes from data(2).
+ int node0IndexInFollowers = followerNodes.indexOf(node0.name());
+ // Make sure node 0 is not stopped. If node0IndexInFollowers==-1, then
node0 is the leader.
+ String blockedNode = followerNodes.remove(node0IndexInFollowers == -1
? 0 : node0IndexInFollowers);
+ blockedNodes.add(blockedNode);
+ logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+ blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg,
tablePartitionId, blockedNodes));
+
+ // Write data(2) to 6 nodes.
+ errors = insertValues(table, partId, 10);
+ assertThat(errors, is(empty()));
+
+ // Disable scale down.
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+ List<String> nodeNamesToStop = new ArrayList<>(followerNodes);
+ // Make sure there are 4 elements in this list.
+ nodeNamesToStop.remove(0);
+ // Stop them all.
+ int[] nodesToStop = nodeNamesToStop.stream()
+ .mapToInt(this::nodeIndex)
+ .toArray();
+ logger().info("Stopping nodes [id={}]", Arrays.toString(nodesToStop));
+ stopNodesInParallel(nodesToStop);
+
+ // Unblock raft.
+ blockedNodes.clear();
+
+ // Two nodes should have the most up to date data, one falls behind.
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.READ_ONLY);
+
+ // Collect nodes that will be the part of the planned assignments.
+ List<String> nodesNamesForFinalAssignments = new
ArrayList<>(clusterNodeNames);
+ nodesNamesForFinalAssignments.removeAll(nodeNamesToStop);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ false
+ );
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ String pendingNodeName =
getPendingNodeName(nodesNamesForFinalAssignments, blockedNode);
+
+ // Pending is the one with the most up to date log index.
+ Assignments assignmentPending =
Assignments.forced(Set.of(Assignment.forPeer(pendingNodeName)), timestamp);
+
+ assertPendingAssignments(node0, partId, assignmentPending);
+
+ Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
+ .map(Assignment::forPeer)
+ .collect(Collectors.toSet());
+
+ Assignments assignmentsPlanned = Assignments.of(peers, timestamp);
+
+ assertPlannedAssignments(node0, partId, assignmentsPlanned);
+
+ // Wait for the new stable assignments to take effect.
+ executeSql(format("ALTER ZONE %s SET replicas=%d", zoneName, 3));
+
+ waitForPartitionState(node0, partId,
GlobalPartitionStateEnum.AVAILABLE);
+
+ // Make sure the data is present.
+ IntStream.range(0, ENTRIES).forEach(i -> {
+ CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null,
Tuple.create(of("id", i)));
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertEquals(Tuple.create(of("val", i + 10)), fut.join());
+ });
+ }
+
+ @Test
+ @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+ void testTwoPhaseResetOnEmptyNodes() throws Exception {
+ int partId = 0;
+
+ IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+ int catalogVersion = node0.catalogManager().latestCatalogVersion();
+ long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 2, 3, 5);
+
+ Assignments blockedRebalance = Assignments.of(timestamp,
+ Assignment.forPeer(node(0).name())
+ );
+
+ blockRebalanceStableSwitch(partId, blockedRebalance);
+
+ stopNodesInParallel(2, 3, 5);
+
+ waitForScale(node0, 3);
+
+ CompletableFuture<?> updateFuture =
node0.disasterRecoveryManager().resetAllPartitions(
+ zoneName,
+ QUALIFIED_TABLE_NAME,
+ true
+ );
+
+ assertThat(updateFuture, willCompleteSuccessfully());
+
+ awaitPrimaryReplica(node0, partId);
+
+ assertRealAssignments(node0, partId, 0, 1, 4);
+
+ Assignments assignmentForced1 =
Assignments.forced(Set.of(Assignment.forPeer(node(0).name())), timestamp);
+
+ assertPendingAssignments(node0, partId, assignmentForced1);
+
+ Assignments assignments13 = Assignments.of(Set.of(
+ Assignment.forPeer(node(0).name()),
+ Assignment.forPeer(node(1).name()),
+ Assignment.forPeer(node(4).name())
+ ), timestamp);
+
+ assertPlannedAssignments(node0, partId, assignments13);
+ }
+
+ private static String getPendingNodeName(List<String> aliveNodes, String
blockedNode) {
+ List<String> candidates = new ArrayList<>(aliveNodes);
+ candidates.remove(blockedNode);
+
+ // Without the blocking node, the other two should have the same raft
log index.
+ // Pick the one with the name
+ candidates.sort(String::compareTo);
+
+ return candidates.get(0);
+ }
+
+ private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
- BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg)
-> stableKeySwitchMessage(msg, partId, assignment);
BiPredicate<String, NetworkMessage> oldPredicate =
node.dropMessagesPredicate();
if (oldPredicate == null) {
- node.dropMessages(newPredicate);
+ node.dropMessages(predicate);
} else {
- node.dropMessages(oldPredicate.or(newPredicate));
+ node.dropMessages(oldPredicate.or(predicate));
}
});
}
+ private static boolean dataReplicateMessage(
+ String nodeName,
+ NetworkMessage msg,
+ TablePartitionId tablePartitionId,
+ Set<String> blockedNodes
+ ) {
+ if (msg instanceof AppendEntriesRequest) {
+ var appendEntriesRequest = (AppendEntriesRequest) msg;
+ if
(tablePartitionId.toString().equals(appendEntriesRequest.groupId())) {
+ return blockedNodes.contains(nodeName);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Block rebalance, so stable won't be switched to specified pending.
+ */
+ private void blockRebalanceStableSwitch(int partId, Assignments
assignment) {
+ blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId,
assignment));
+ }
+
private boolean stableKeySwitchMessage(NetworkMessage msg, int partId,
Assignments blockedAssignments) {
if (msg instanceof WriteActionRequest) {
var writeActionRequest = (WriteActionRequest) msg;
@@ -624,7 +988,7 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
return false;
}
- private void waitForPartitionState(IgniteImpl node0,
GlobalPartitionStateEnum expectedState) throws InterruptedException {
+ private void waitForPartitionState(IgniteImpl node0, int partId,
GlobalPartitionStateEnum expectedState) throws InterruptedException {
assertTrue(waitForCondition(() -> {
CompletableFuture<Map<TablePartitionId, GlobalPartitionState>>
statesFuture = node0.disasterRecoveryManager()
.globalPartitionStates(Set.of(zoneName), emptySet());
@@ -633,15 +997,26 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
Map<TablePartitionId, GlobalPartitionState> map =
statesFuture.join();
- GlobalPartitionState state = map.values().iterator().next();
+ GlobalPartitionState state = map.get(new TablePartitionId(tableId,
partId));
- return state.state == expectedState;
+ return state != null && state.state == expectedState;
}, 500, 20_000),
() -> "Expected state: " + expectedState
+ ", actual: " +
node0.disasterRecoveryManager().globalPartitionStates(Set.of(zoneName),
emptySet()).join()
);
}
+ private String findLeader(int nodeIdx, int partId) {
+ IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));
+
+ var raftNodeId = new RaftNodeId(new TablePartitionId(tableId, partId),
new Peer(node.name()));
+ var jraftServer = (JraftServerImpl) node.raftManager().server();
+
+ RaftGroupService raftGroupService =
jraftServer.raftGroupService(raftNodeId);
+ assertNotNull(raftGroupService);
+ return raftGroupService.getRaftNode().getLeaderId().getConsistentId();
+ }
+
private void triggerRaftSnapshot(int nodeIdx, int partId) throws
InterruptedException, ExecutionException {
//noinspection resource
IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));
@@ -680,6 +1055,20 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
);
}
+ private void assertPlannedAssignments(IgniteImpl node0, int partId,
Assignments expected) throws InterruptedException {
+ assertTrue(
+ waitForCondition(() ->
expected.equals(getPlannedAssignments(node0, partId)), 2000),
+ () -> "Expected: " + expected + ", actual: " +
getPlannedAssignments(node0, partId)
+ );
+ }
+
+ private void assertStableAssignments(IgniteImpl node0, int partId,
Assignments expected) throws InterruptedException {
+ assertTrue(
+ waitForCondition(() ->
expected.equals(getStableAssignments(node0, partId)), 2000),
+ () -> "Expected: " + expected + ", actual: " +
getStableAssignments(node0, partId)
+ );
+ }
+
/**
* Inserts {@value ENTRIES} values into a table, expecting either a
success or specific set of exceptions that would indicate
* replication issues. Collects such exceptions into a list and returns.
Fails if unexpected exception happened.
@@ -776,8 +1165,19 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
.collect(Collectors.toList());
}
- private @Nullable Assignments getPendingAssignments(IgniteImpl node0, int
partId) {
- CompletableFuture<Entry> pendingFut = node0.metaStorageManager()
+ private @Nullable Assignments getPlannedAssignments(IgniteImpl node, int
partId) {
+ CompletableFuture<Entry> plannedFut = node.metaStorageManager()
+ .get(plannedPartAssignmentsKey(new TablePartitionId(tableId,
partId)));
+
+ assertThat(plannedFut, willCompleteSuccessfully());
+
+ Entry planned = plannedFut.join();
+
+ return planned.empty() ? null : Assignments.fromBytes(planned.value());
+ }
+
+ private @Nullable Assignments getPendingAssignments(IgniteImpl node, int
partId) {
+ CompletableFuture<Entry> pendingFut = node.metaStorageManager()
.get(pendingPartAssignmentsKey(new TablePartitionId(tableId,
partId)));
assertThat(pendingFut, willCompleteSuccessfully());
@@ -787,8 +1187,8 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
return pending.empty() ? null : Assignments.fromBytes(pending.value());
}
- private @Nullable Assignments getStableAssignments(IgniteImpl node0, int
partId) {
- CompletableFuture<Entry> stableFut = node0.metaStorageManager()
+ private @Nullable Assignments getStableAssignments(IgniteImpl node, int
partId) {
+ CompletableFuture<Entry> stableFut = node.metaStorageManager()
.get(stablePartAssignmentsKey(new TablePartitionId(tableId,
partId)));
assertThat(stableFut, willCompleteSuccessfully());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 74994de8c7..4b99b1ea8f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.stream.Collectors.groupingBy;
@@ -241,6 +242,21 @@ public class DisasterRecoveryManager implements
IgniteComponent, SystemViewProvi
);
}
+ /**
+ * Updates assignments of the table in a forced manner, allowing for the
recovery of raft group with lost majorities. It is achieved via
+ * triggering a new rebalance with {@code force} flag enabled in {@link
Assignments} for partitions where it's required. New pending
+ * assignments with {@code force} flag remove old stable nodes from the
distribution, and force new Raft configuration via "resetPeers"
+ * so that a new leader could be elected.
+ *
+ * @param zoneName Name of the distribution zone. Case-sensitive, without
quotes.
+ * @param tableName Fully-qualified table name. Case-sensitive, without
quotes. Example: "PUBLIC.Foo".
+ * @param manualUpdate Whether the update is triggered manually by user or
automatically by core logic.
+ * @return Future that completes when partitions are reset.
+ */
+ public CompletableFuture<Void> resetAllPartitions(String zoneName, String
tableName, boolean manualUpdate) {
+ return resetPartitions(zoneName, tableName, emptySet(), manualUpdate);
+ }
+
/**
* Updates assignments of the table in a forced manner, allowing for the
recovery of raft group with lost majorities. It is achieved via
* triggering a new rebalance with {@code force} flag enabled in {@link
Assignments} for partitions where it's required. New pending
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index 2abaffaf99..f97386ee8a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -28,7 +28,7 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableStableAssignments;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
@@ -44,22 +44,26 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import static
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
import
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -74,6 +78,8 @@ import org.apache.ignite.internal.util.CollectionUtils;
import org.jetbrains.annotations.Nullable;
class GroupUpdateRequest implements DisasterRecoveryRequest {
+ private static final IgniteLogger LOG =
Loggers.forClass(GroupUpdateRequest.class);
+
private final UUID operationId;
/**
@@ -155,7 +161,9 @@ class GroupUpdateRequest implements DisasterRecoveryRequest
{
.map(NodeWithAttributes::nodeName)
.collect(toSet());
- CompletableFuture<?>[] futures = forceAssignmentsUpdate(
+ int[] partitionIdsArray =
AssignmentUtil.partitionIds(partitionIds, zoneDescriptor.partitions());
+
+ return forceAssignmentsUpdate(
tableDescriptor,
zoneDescriptor,
dataNodes,
@@ -163,11 +171,18 @@ class GroupUpdateRequest implements
DisasterRecoveryRequest {
msRevision,
disasterRecoveryManager.metaStorageManager,
localStatesMap,
- catalog.time()
+ catalog.time(),
+ partitionIdsArray,
+ manualUpdate
);
-
- return allOf(futures);
- }).thenCompose(Function.identity());
+ })
+ .thenCompose(Function.identity())
+ .whenComplete((unused, throwable) -> {
+ // TODO: IGNITE-23635 Add fail handling for failed resetPeers
+ if (throwable != null) {
+ LOG.error("Failed to reset partition", throwable);
+ }
+ });
}
/**
@@ -181,10 +196,11 @@ class GroupUpdateRequest implements
DisasterRecoveryRequest {
* @param aliveNodesConsistentIds Set of alive nodes according to logical
topology.
* @param revision Meta-storage revision to be associated with
reassignment.
* @param metaStorageManager Meta-storage manager.
- * @param localStatesMap Local partition states retrieved by {@link
DisasterRecoveryManager#localPartitionStates(Set, Set, Set)}.
+ * @param localStatesMap Local partition states retrieved by
+ * {@link DisasterRecoveryManager#localPartitionStates(Set, Set,
Set)}.
* @return A future that will be completed when reassignments data is
written into a meta-storage, if that's required.
*/
- private CompletableFuture<?>[] forceAssignmentsUpdate(
+ private static CompletableFuture<Void> forceAssignmentsUpdate(
CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor,
Set<String> dataNodes,
@@ -192,47 +208,71 @@ class GroupUpdateRequest implements
DisasterRecoveryRequest {
long revision,
MetaStorageManager metaStorageManager,
Map<TablePartitionId, LocalPartitionStateMessageByNode>
localStatesMap,
- long assignmentsTimestamp
+ long assignmentsTimestamp,
+ int[] partitionIds,
+ boolean manualUpdate
) {
- CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut =
tableAssignments(
- metaStorageManager,
- tableDescriptor.id(),
- partitionIds,
- zoneDescriptor.partitions()
- );
-
- Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes,
aliveNodesConsistentIds);
-
- int[] partitionIdsArray = partitionIds.isEmpty()
- ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
- : partitionIds.stream().mapToInt(Integer::intValue).toArray();
-
- CompletableFuture<?>[] futures = new
CompletableFuture[partitionIdsArray.length];
-
- for (int partitionId = 0; partitionId < partitionIdsArray.length;
partitionId++) {
- TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
+ return tableStableAssignments(metaStorageManager,
tableDescriptor.id(), partitionIds)
+ .thenCompose(tableAssignments -> {
+ if (tableAssignments.isEmpty()) {
+ return nullCompletedFuture();
+ }
- futures[partitionId] =
tableAssignmentsFut.thenCompose(tableAssignments ->
- tableAssignments.isEmpty() ? nullCompletedFuture() :
partitionUpdate(
- replicaGrpId,
- aliveDataNodes,
+ return updateAssignments(
+ tableDescriptor,
+ zoneDescriptor,
+ dataNodes,
aliveNodesConsistentIds,
- zoneDescriptor.replicas(),
revision,
metaStorageManager,
-
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
- localStatesMap.get(replicaGrpId),
+ localStatesMap,
assignmentsTimestamp,
- this.manualUpdate
- )).thenAccept(res -> {
- DisasterRecoveryManager.LOG.info(
- "Partition {} returned {} status on reset
attempt", replicaGrpId, UpdateStatus.valueOf(res)
- );
- }
- );
+ partitionIds,
+ tableAssignments,
+ manualUpdate
+ );
+ });
+ }
+
+ private static CompletableFuture<Void> updateAssignments(
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor,
+ Set<String> dataNodes,
+ Set<String> aliveNodesConsistentIds,
+ long revision,
+ MetaStorageManager metaStorageManager,
+ Map<TablePartitionId, LocalPartitionStateMessageByNode>
localStatesMap,
+ long assignmentsTimestamp,
+ int[] partitionIds,
+ Map<Integer, Assignments> tableAssignments,
+ boolean manualUpdate
+ ) {
+ Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes,
aliveNodesConsistentIds);
+
+ CompletableFuture<?>[] futures = new
CompletableFuture[partitionIds.length];
+
+ for (int i = 0; i < partitionIds.length; i++) {
+ TablePartitionId replicaGrpId = new
TablePartitionId(tableDescriptor.id(), partitionIds[i]);
+
+ futures[i] = partitionUpdate(
+ replicaGrpId,
+ aliveDataNodes,
+ aliveNodesConsistentIds,
+ zoneDescriptor.replicas(),
+ revision,
+ metaStorageManager,
+ tableAssignments.get(replicaGrpId.partitionId()).nodes(),
+ localStatesMap.get(replicaGrpId),
+ assignmentsTimestamp,
+ manualUpdate
+ ).thenAccept(res -> {
+ DisasterRecoveryManager.LOG.info(
+ "Partition {} returned {} status on reset attempt",
replicaGrpId, UpdateStatus.valueOf(res)
+ );
+ });
}
- return futures;
+ return allOf(futures);
}
private static CompletableFuture<Integer> partitionUpdate(
@@ -254,44 +294,53 @@ class GroupUpdateRequest implements
DisasterRecoveryRequest {
return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
}
- Iif invokeClosure;
-
- if (aliveStableNodes.isEmpty()) {
- if (!manualUpdate) {
- return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
- }
+ if (aliveStableNodes.isEmpty() && !manualUpdate) {
+ return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+ }
+ if (manualUpdate) {
enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
+ }
- // There are no known nodes with data, which means that we can
just put new assignments into pending assignments with "forced"
- // flag.
- invokeClosure = prepareMsInvokeClosure(
- partId,
- longToBytesKeepingOrder(revision),
- Assignments.forced(partAssignments,
assignmentsTimestamp).toBytes(),
- null
- );
- } else {
- Set<Assignment> stableAssignments = Set.copyOf(partAssignments);
-
- if (manualUpdate) {
- enrichAssignments(partId, aliveDataNodes, replicas,
partAssignments);
- }
+ Assignment nextAssignment =
nextAssignment(localPartitionStateMessageByNode, partAssignments);
- // There are nodes with data, and we set pending assignments to
this set of nodes. It'll be the source of peers for
- // "resetPeers", and after that new assignments with restored
replica factor wil be picked up from planned assignments
- // for the case of the manual update, that was triggered by a user.
- invokeClosure = prepareMsInvokeClosure(
- partId,
- longToBytesKeepingOrder(revision),
- Assignments.forced(stableAssignments,
assignmentsTimestamp).toBytes(),
- Assignments.toBytes(partAssignments, assignmentsTimestamp)
- );
- }
+ // There are nodes with data, and we set pending assignments to this
set of nodes. It'll be the source of peers for
+ // "resetPeers", and after that new assignments with restored replica
factor wil be picked up from planned assignments
+ // for the case of the manual update, that was triggered by a user.
+ Iif invokeClosure = prepareMsInvokeClosure(
+ partId,
+ longToBytesKeepingOrder(revision),
+ Assignments.forced(Set.of(nextAssignment),
assignmentsTimestamp).toBytes(),
+ Assignments.toBytes(partAssignments, assignmentsTimestamp)
+ );
return
metaStorageMgr.invoke(invokeClosure).thenApply(StatementResult::getAsInt);
}
+ /**
+ * Returns an assignment with the most up to date log index, if there are
more than one node with the same index,
+ * returns the first one in the lexicographic order.
+ */
+ private static Assignment nextAssignment(
+ LocalPartitionStateMessageByNode localPartitionStateByNode,
+ Set<Assignment> assignments
+ ) {
+ // For nodes that we know log index for (having any data), we choose
the node with the highest log index.
+ // If there are more than one node with same log index, we choose the
one with the first consistent id in the lexicographic order.
+ Optional<Assignment> nodeWithMaxLogIndex = assignments.stream()
+ .filter(assignment ->
localPartitionStateByNode.partitionState(assignment.consistentId()) != null)
+ .min(Comparator.<Assignment>comparingLong(
+ node ->
localPartitionStateByNode.partitionState(node.consistentId()).logIndex()
+ )
+ .reversed()
+ .thenComparing(Assignment::consistentId))
+ // If there are no nodes with data, we choose the node with
the first consistent id in the lexicographic order.
+ .or(() ->
assignments.stream().min(Comparator.comparing(Assignment::consistentId)));
+
+ // TODO: IGNITE-23737 The case with no suitable nodes should be
handled.
+ return nodeWithMaxLogIndex.orElseThrow();
+ }
+
/**
* Creates an {@link Iif} instance for meta-storage's {@link
MetaStorageManager#invoke(Iif)} call. Does the following:
* <ul>
@@ -371,8 +420,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest
{
int replicas,
Set<Assignment> partAssignments
) {
- Set<Assignment> calcAssignments =
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(),
- replicas);
+ Set<Assignment> calcAssignments =
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), replicas);
for (Assignment calcAssignment : calcAssignments) {
if (partAssignments.size() == replicas) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
index 4e0fc5f228..b1ba02cad3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
@@ -51,6 +51,15 @@ public class LocalPartitionStateMessageByNode {
map.put(nodeName, state);
}
+ /**
+ * Returns node state mapping.
+ *
+ * @param node Consistent ID of the node.
+ */
+ public LocalPartitionStateMessage partitionState(String node) {
+ return map.get(node);
+ }
+
@Override
public String toString() {
return map.toString();