This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6b8b43aa4 IGNITE-23559 resetPartitions improvements: two phase reset 
(#4688)
d6b8b43aa4 is described below

commit d6b8b43aa484654f3de9c4e3dd2f6e216d18e2b9
Author: Cyrill <[email protected]>
AuthorDate: Thu Nov 21 19:50:45 2024 +0300

    IGNITE-23559 resetPartitions improvements: two phase reset (#4688)
---
 .../rebalance/AssignmentUtil.java                  |  95 +++++
 .../rebalance/DistributionZoneRebalanceEngine.java |  42 +-
 .../distributionzones/rebalance/RebalanceUtil.java | 163 ++++---
 .../ignite/internal/table/ItDurableFinishTest.java |  11 -
 .../java/org/apache/ignite/internal/Cluster.java   |   7 +
 .../internal/ClusterPerTestIntegrationTest.java    |   9 +
 .../ItDisasterRecoveryReconfigurationTest.java     | 468 +++++++++++++++++++--
 .../disaster/DisasterRecoveryManager.java          |  16 +
 .../distributed/disaster/GroupUpdateRequest.java   | 196 +++++----
 .../disaster/LocalPartitionStateMessageByNode.java |   9 +
 10 files changed, 793 insertions(+), 223 deletions(-)

diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
new file mode 100644
index 0000000000..9bac627ecb
--- /dev/null
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/AssignmentUtil.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.distributionzones.rebalance;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+
+/**
+ * Util class for methods to work with the assignments.
+ */
+public class AssignmentUtil {
+    /**
+     * Collect partition ids for partition assignments.
+     *
+     * @param partitionIds IDs of partitions to get assignments for. If empty, 
get all partition assignments.
+     * @param numberOfPartitions Number of partitions. Ignored if partition 
IDs are specified.
+     */
+    public static int[] partitionIds(Set<Integer> partitionIds, int 
numberOfPartitions) {
+        IntStream partitionIdsStream = partitionIds.isEmpty()
+                ? IntStream.range(0, numberOfPartitions)
+                : partitionIds.stream().mapToInt(Integer::intValue);
+
+        return partitionIdsStream.toArray();
+    }
+
+    /**
+     * Collect partition ids for partition assignments.
+     *
+     * @param numberOfPartitions Number of partitions.
+     */
+    public static int[] partitionIds(int numberOfPartitions) {
+        return partitionIds(Set.of(), numberOfPartitions);
+    }
+
+    /**
+     * Returns assignments for table partitions from meta storage.
+     *
+     * @param metaStorageManager Meta storage manager.
+     * @param partitionIds IDs of partitions to get assignments for.
+     * @return Future with table assignments as a value.
+     */
+    public static CompletableFuture<Map<Integer, Assignments>> 
metastoreAssignments(
+            MetaStorageManager metaStorageManager,
+            int[] partitionIds,
+            Function<Integer, ByteArray> keyForPartition
+    ) {
+        Map<ByteArray, Integer> partitionKeysToPartitionNumber = new 
HashMap<>();
+
+        for (int partitionId : partitionIds) {
+            
partitionKeysToPartitionNumber.put(keyForPartition.apply(partitionId), 
partitionId);
+        }
+
+        return 
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
+                .thenApply(entries -> {
+                    if (entries.isEmpty()) {
+                        return Map.of();
+                    }
+
+                    Map<Integer, Assignments> result = new HashMap<>();
+
+                    for (var mapEntry : entries.entrySet()) {
+                        Entry entry = mapEntry.getValue();
+
+                        if (!entry.empty() && !entry.tombstone()) {
+                            
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), 
Assignments.fromBytes(entry.value()));
+                        }
+                    }
+
+                    return result.isEmpty() ? Map.of() : result;
+                });
+    }
+}
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
index c92f85e2b0..ae252da62d 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java
@@ -33,7 +33,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.catalog.Catalog;
@@ -51,7 +50,6 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.Revisions;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.TestOnly;
@@ -312,53 +310,17 @@ public class DistributionZoneRebalanceEngine {
         List<CompletableFuture<?>> tableFutures = new 
ArrayList<>(tableDescriptors.size());
 
         for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
-            CompletableFuture<?>[] partitionFutures = 
RebalanceUtil.triggerAllTablePartitionsRebalance(
+            tableFutures.add(RebalanceUtil.triggerAllTablePartitionsRebalance(
                     tableDescriptor,
                     zoneDescriptor,
                     dataNodes,
                     revision,
                     metaStorageManager,
                     assignmentsTimestamp
-            );
-
-            // This set is used to deduplicate exceptions (if there is an 
exception from upstream, for instance,
-            // when reading from MetaStorage, it will be encountered by every 
partition future) to avoid noise
-            // in the logs.
-            Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
-
-            for (int partId = 0; partId < partitionFutures.length; partId++) {
-                int finalPartId = partId;
-
-                partitionFutures[partId].exceptionally(e -> {
-                    Throwable cause = ExceptionUtils.unwrapCause(e);
-
-                    if (unwrappedCauses.add(cause)) {
-                        // The exception is specific to this partition.
-                        LOG.error(
-                                "Exception on updating assignments for 
[table={}, partition={}]",
-                                e,
-                                tableInfo(tableDescriptor), finalPartId
-                        );
-                    } else {
-                        // The exception is from upstream and not specific for 
this partition, so don't log the partition index.
-                        LOG.error(
-                                "Exception on updating assignments for 
[table={}]",
-                                e,
-                                tableInfo(tableDescriptor)
-                        );
-                    }
-
-                    return null;
-                });
-            }
-
-            tableFutures.add(allOf(partitionFutures));
+            ));
         }
 
         return allOf(tableFutures.toArray(CompletableFuture[]::new));
     }
 
-    private static String tableInfo(CatalogTableDescriptor tableDescriptor) {
-        return tableDescriptor.id() + "/" + tableDescriptor.name();
-    }
 }
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index c859d8a411..a03f2e3b71 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.distributionzones.rebalance;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil.metastoreAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil.partitionIds;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.ASSIGNMENT_NOT_UPDATED;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.OUTDATED_UPDATE_RECEIVED;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus.PENDING_KEY_UPDATED;
@@ -40,12 +43,12 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -60,6 +63,7 @@ import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -276,7 +280,7 @@ public class RebalanceUtil {
      * @return Array of futures, one per partition of the table; the futures 
complete when the described
      *     rebalance triggering completes.
      */
-    public static CompletableFuture<?>[] triggerAllTablePartitionsRebalance(
+    public static CompletableFuture<Void> triggerAllTablePartitionsRebalance(
             CatalogTableDescriptor tableDescriptor,
             CatalogZoneDescriptor zoneDescriptor,
             Set<String> dataNodes,
@@ -284,39 +288,93 @@ public class RebalanceUtil {
             MetaStorageManager metaStorageManager,
             long assignmentsTimestamp
     ) {
-        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = 
tableAssignments(
-                metaStorageManager,
-                tableDescriptor.id(),
-                Set.of(),
-                zoneDescriptor.partitions()
-        );
+        int[] partitionIds = partitionIds(zoneDescriptor.partitions());
+
+        return tableStableAssignments(metaStorageManager, 
tableDescriptor.id(), partitionIds)
+                .thenCompose(stableAssignments -> {
+                    if (stableAssignments.isEmpty()) {
+                        return nullCompletedFuture();
+                    }
+
+                    return tablePartitionAssignment(
+                            tableDescriptor,
+                            zoneDescriptor,
+                            dataNodes,
+                            storageRevision,
+                            metaStorageManager,
+                            assignmentsTimestamp,
+                            stableAssignments
+                    );
+                });
+    }
 
+    private static CompletableFuture<Void> tablePartitionAssignment(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            Set<String> dataNodes,
+            long storageRevision,
+            MetaStorageManager metaStorageManager,
+            long assignmentsTimestamp,
+            Map<Integer, Assignments> tableAssignments
+    ) {
+        // tableAssignments should not be empty. It is checked for emptiness 
before calling this method.
         CompletableFuture<?>[] futures = new 
CompletableFuture[zoneDescriptor.partitions()];
 
         for (int partId = 0; partId < zoneDescriptor.partitions(); partId++) {
             TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partId);
 
+            // TODO https://issues.apache.org/jira/browse/IGNITE-19763 We 
should distinguish empty stable assignments on
+            // TODO node recovery in case of interrupted table creation, and 
moving from empty assignments to non-empty.
+            futures[partId] = updatePendingAssignmentsKeys(
+                    tableDescriptor,
+                    replicaGrpId,
+                    dataNodes,
+                    zoneDescriptor.replicas(),
+                    storageRevision,
+                    metaStorageManager,
+                    partId,
+                    tableAssignments.get(partId).nodes(),
+                    assignmentsTimestamp
+            );
+        }
+
+        // This set is used to deduplicate exceptions (if there is an 
exception from upstream, for instance,
+        // when reading from MetaStorage, it will be encountered by every 
partition future) to avoid noise
+        // in the logs.
+        Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+        for (int partId = 0; partId < futures.length; partId++) {
             int finalPartId = partId;
 
-            futures[partId] = tableAssignmentsFut.thenCompose(tableAssignments 
->
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-19763 
We should distinguish empty stable assignments on
-                    // TODO node recovery in case of interrupted table 
creation, and moving from empty assignments to non-empty.
-                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
updatePendingAssignmentsKeys(
-                            tableDescriptor,
-                            replicaGrpId,
-                            dataNodes,
-                            zoneDescriptor.replicas(),
-                            storageRevision,
-                            metaStorageManager,
-                            finalPartId,
-                            tableAssignments.get(finalPartId).nodes(),
-                            assignmentsTimestamp
-                    ));
+            futures[partId].exceptionally(e -> {
+                Throwable cause = ExceptionUtils.unwrapCause(e);
+
+                if (unwrappedCauses.add(cause)) {
+                    // The exception is specific to this partition.
+                    LOG.error(
+                            "Exception on updating assignments for 
[tableId={}, name={}, partition={}]",
+                            e,
+                            tableDescriptor.id(), tableDescriptor.name(), 
finalPartId
+                    );
+                } else {
+                    // The exception is from upstream and not specific for 
this partition, so don't log the partition index.
+                    LOG.error(
+                            "Exception on updating assignments for 
[tableId={}, name={}]",
+                            e,
+                            tableDescriptor.id(), tableDescriptor.name()
+                    );
+                }
+
+                return null;
+            });
         }
 
-        return futures;
+        return allOf(futures);
     }
 
+    /** Key prefix for planned assignments. */
+    public static final String PLANNED_ASSIGNMENTS_PREFIX = 
"assignments.planned.";
+
     /** Key prefix for pending assignments. */
     public static final String PENDING_ASSIGNMENTS_PREFIX = 
"assignments.pending.";
 
@@ -359,7 +417,7 @@ public class RebalanceUtil {
      * @see <a 
href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md";>Rebalance
 documentation</a>
      */
     public static ByteArray plannedPartAssignmentsKey(TablePartitionId partId) 
{
-        return new ByteArray("assignments.planned." + partId);
+        return new ByteArray(PLANNED_ASSIGNMENTS_PREFIX + partId);
     }
 
     /**
@@ -521,54 +579,31 @@ public class RebalanceUtil {
     }
 
     /**
-     * Returns table assignments for table partitions from meta storage.
+     * Returns stable table assignments for table partitions from meta storage.
      *
      * @param metaStorageManager Meta storage manager.
      * @param tableId Table id.
-     * @param partitionIds IDs of partitions to get assignments for. If empty, 
get all partition assignments.
-     * @param numberOfPartitions Number of partitions. Ignored if partition 
IDs are specified.
+     * @param partitionIds IDs of partitions to get assignments for.
      * @return Future with table assignments as a value.
      */
-    public static CompletableFuture<Map<Integer, Assignments>> 
tableAssignments(
+    public static CompletableFuture<Map<Integer, Assignments>> 
tableStableAssignments(
             MetaStorageManager metaStorageManager,
             int tableId,
-            Set<Integer> partitionIds,
-            int numberOfPartitions
+            int[] partitionIds
     ) {
-        IntStream partitionIdsStream = partitionIds.isEmpty()
-                ? IntStream.range(0, numberOfPartitions)
-                : partitionIds.stream().mapToInt(Integer::intValue);
-
-        Map<ByteArray, Integer> partitionKeysToPartitionNumber = 
partitionIdsStream.collect(
-                HashMap::new,
-                (map, partId) -> map.put(stablePartAssignmentsKey(new 
TablePartitionId(tableId, partId)), partId),
-                Map::putAll
-        );
-
-        return 
metaStorageManager.getAll(partitionKeysToPartitionNumber.keySet())
-                .thenApply(entries -> {
-                    if (entries.isEmpty()) {
-                        return Map.of();
-                    }
-
-                    Map<Integer, Assignments> result = new HashMap<>();
-                    int numberOfMsPartitions = 0;
-
-                    for (var mapEntry : entries.entrySet()) {
-                        Entry entry = mapEntry.getValue();
-
-                        if (!entry.empty() && !entry.tombstone()) {
-                            
result.put(partitionKeysToPartitionNumber.get(mapEntry.getKey()), 
Assignments.fromBytes(entry.value()));
-                            numberOfMsPartitions++;
-                        }
-                    }
-
-                    assert numberOfMsPartitions == 0 || numberOfMsPartitions 
== entries.size()
-                            : "Invalid number of stable partition entries 
received from meta storage [received="
-                            + numberOfMsPartitions + ", numberOfPartitions=" + 
entries.size() + ", tableId=" + tableId + "].";
-
-                    return numberOfMsPartitions == 0 ? Map.of() : result;
-                });
+        return metastoreAssignments(
+                metaStorageManager,
+                partitionIds,
+                partitionId -> stablePartAssignmentsKey(new 
TablePartitionId(tableId, partitionId))
+        ).whenComplete((assignmentsMap, throwable) -> {
+            if (throwable == null) {
+                int numberOfMsPartitions = assignmentsMap.size();
+
+                assert numberOfMsPartitions == 0 || numberOfMsPartitions == 
partitionIds.length
+                        : "Invalid number of partition entries received from 
meta storage [received="
+                        + numberOfMsPartitions + ", numberOfPartitions=" + 
partitionIds.length + ", tableId=" + tableId + "].";
+            }
+        });
     }
 
     /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 0b7fc2c4a0..1c63cd7bfb 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -66,7 +66,6 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.MismatchingTransactionOutcomeException;
 import org.apache.ignite.tx.TransactionException;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -383,16 +382,6 @@ public class ItDurableFinishTest extends 
ClusterPerTestIntegrationTest {
         );
     }
 
-    private @Nullable Integer nodeIndex(String name) {
-        for (int i = 0; i < initialNodes(); i++) {
-            if (node(i).name().equals(name)) {
-                return i;
-            }
-        }
-
-        return null;
-    }
-
     private static class Context {
         private final IgniteImpl primaryNode;
         private final IgniteImpl coordinatorNode;
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 37cd0dc4fb..a47ce774cd 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -357,6 +357,13 @@ public class Cluster {
         return Objects.requireNonNull(nodes.get(index));
     }
 
+    /**
+     * Returns an Ignite node (a member of the cluster) by its index.
+     */
+    public @Nullable Ignite nullableNode(int index) {
+        return nodes.get(index);
+    }
+
     /**
      * Returns a node that is not stopped and not knocked out (so it can be 
used to interact with the cluster).
      */
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 8965cdf32a..5570cf775e 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestInfo;
@@ -220,6 +221,14 @@ public abstract class ClusterPerTestIntegrationTest 
extends BaseIgniteAbstractTe
         return cluster.node(index);
     }
 
+    public @Nullable Ignite nullableNode(int index) {
+        return cluster.nullableNode(index);
+    }
+
+    protected int nodeIndex(String name) {
+        return cluster.nodeIndex(name);
+    }
+
     protected final IgniteImpl igniteImpl(int index) {
         return unwrapIgniteImpl(node(index));
     }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 405e8b2775..caa19b6702 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -29,6 +29,7 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapTableManager;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static 
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
@@ -54,6 +55,7 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,6 +65,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -97,7 +100,9 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -151,7 +156,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         ZoneParams zoneParams = testMethod.getAnnotation(ZoneParams.class);
 
-        startNodesInParallel(IntStream.range(INITIAL_NODES, 
zoneParams.nodes()).toArray());
+        IntStream.range(INITIAL_NODES, zoneParams.nodes()).forEach(i -> 
cluster.startNode(i));
+        // TODO: IGNITE-22818 Fails with "Race operations took too long"
+        // startNodesInParallel(IntStream.range(INITIAL_NODES, 
zoneParams.nodes()).toArray());
 
         executeSql(format("CREATE ZONE %s with replicas=%d, partitions=%d,"
                         + " data_nodes_auto_adjust_scale_down=%d, 
data_nodes_auto_adjust_scale_up=%d, storage_profiles='%s'",
@@ -236,10 +243,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         waitForScale(node0, 3);
 
-        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of(),
                 true
         );
 
@@ -323,10 +329,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         waitForScale(node0, 1);
 
-        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of(),
                 true
         );
 
@@ -374,7 +379,11 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         // Second snapshot causes log truncation.
         triggerRaftSnapshot(1, partId);
 
-        unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) -> 
node(3).name().equals(nodeName) && msg instanceof SnapshotMvDataResponse);
+        unwrapIgniteImpl(node(1)).dropMessages((nodeName, msg) -> {
+            Ignite node = nullableNode(3);
+
+            return node != null && node.name().equals(nodeName) && msg 
instanceof SnapshotMvDataResponse;
+        });
 
         stopNodesInParallel(4, 5);
         waitForScale(node0, 4);
@@ -389,28 +398,29 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         blockRebalanceStableSwitch(partId, assignment013);
 
-        CompletableFuture<Void> resetFuture =
-                node0.disasterRecoveryManager().resetPartitions(zoneName, 
QUALIFIED_TABLE_NAME, emptySet(), true);
+        CompletableFuture<Void> resetFuture = 
node0.disasterRecoveryManager().resetAllPartitions(zoneName, 
QUALIFIED_TABLE_NAME, true);
         assertThat(resetFuture, willCompleteSuccessfully());
 
-        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.DEGRADED);
 
         var localStatesFut = 
node0.disasterRecoveryManager().localPartitionStates(emptySet(), 
Set.of(node(3).name()), emptySet());
         assertThat(localStatesFut, willCompleteSuccessfully());
 
         Map<TablePartitionId, LocalPartitionStateByNode> localStates = 
localStatesFut.join();
         assertThat(localStates, is(not(anEmptyMap())));
-        assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, 
localStates.values().iterator().next().values().iterator().next().state);
+        LocalPartitionStateByNode localPartitionStateByNode = 
localStates.get(new TablePartitionId(tableId, partId));
+
+        assertEquals(LocalPartitionStateEnum.INSTALLING_SNAPSHOT, 
localPartitionStateByNode.values().iterator().next().state);
 
         stopNode(1);
         waitForScale(node0, 3);
 
-        waitForPartitionState(node0, GlobalPartitionStateEnum.DEGRADED);
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.DEGRADED);
 
-        resetFuture = 
node0.disasterRecoveryManager().resetPartitions(zoneName, QUALIFIED_TABLE_NAME, 
emptySet(), true);
+        resetFuture = 
node0.disasterRecoveryManager().resetAllPartitions(zoneName, 
QUALIFIED_TABLE_NAME, true);
         assertThat(resetFuture, willCompleteSuccessfully());
 
-        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.AVAILABLE);
 
         awaitPrimaryReplica(node0, partId);
         assertRealAssignments(node0, partId, 0, 2, 3);
@@ -455,10 +465,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         waitForScale(node0, 3);
 
-        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of(),
                 false
         );
 
@@ -498,10 +507,9 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
 
         stopNodesInParallel(1);
 
-        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetPartitions(
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
                 zoneName,
                 QUALIFIED_TABLE_NAME,
-                Set.of(),
                 false
         );
 
@@ -562,38 +570,394 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         assertFalse(getPendingAssignments(node0, partId).force());
 
-        waitForPartitionState(node0, GlobalPartitionStateEnum.AVAILABLE);
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.AVAILABLE);
 
         executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
 
         stopNode(4);
 
         CompletableFuture<Void> resetFuture =
-                node0.disasterRecoveryManager().resetPartitions(zoneName, 
QUALIFIED_TABLE_NAME, emptySet(), false);
+                node0.disasterRecoveryManager().resetAllPartitions(zoneName, 
QUALIFIED_TABLE_NAME, false);
         assertThat(resetFuture, willCompleteSuccessfully());
 
-        Assignments assignmentForced13 = 
Assignments.forced(Set.of(Assignment.forPeer(node(1).name()),
-                Assignment.forPeer(node(3).name())), timestamp);
+        Assignments assignmentForced1 = 
Assignments.forced(Set.of(Assignment.forPeer(node(1).name())), timestamp);
+
+        assertPendingAssignments(node0, partId, assignmentForced1);
+
+        Assignments assignments13 = Assignments.of(Set.of(
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(3).name())
+        ), timestamp);
 
-        assertPendingAssignments(node0, partId, assignmentForced13);
+        assertPlannedAssignments(node0, partId, assignments13);
     }
 
     /**
-     * Block rebalance, so stable won't be switched to specified pending.
+     * The test creates a group of 7 nodes, and performs writes in three steps:
+     * <ol>
+     * <li>Write data to all nodes.</li>
+     * <li>Block raft updates on one of the nodes. Write second portion of 
data to the group.</li>
+     * <li>Additionally block updates on another node. Write third portion of 
data to the group.</li>
+     * </ol>
+     *
+     * <p>As a result, we'll have one node having only data(1), one node 
having data(2) and the other 5 nodes having
+     * data(3).
+     * Then we stop 4 out of 5 nodes with data(3) - effectively the majority 
of 7 nodes, and call resetPartitions.
+     * The two phase reset should pick the node with the highest raft log 
index as the single node for the pending assignments.
      */
-    private void blockRebalanceStableSwitch(int partId, Assignments 
assignment) {
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1)
+    void testThoPhaseResetMaxLogIndex() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        Set<String> clusterNodeNames = Set.of(
+                node(0).name(),
+                node(1).name(),
+                node(2).name(),
+                node(3).name(),
+                node(4).name(),
+                node(5).name(),
+                node(6).name());
+        assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+        Assignments allAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name()),
+                Assignment.forPeer(node(6).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, allAssignments);
+
+        // Write data(1) to all seven nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
+
+        // We filter out the leader to be able to reliably block raft state 
transfer.
+        String leaderName = findLeader(1, partId);
+        logger().info("Raft group leader is [id={}]", leaderName);
+
+        // All the nodes except the leader - 6 nodes.
+        List<String> followerNodes = new ArrayList<>(clusterNodeNames);
+        followerNodes.remove(leaderName);
+
+        // The nodes that we block AppendEntriesRequest to.
+        Set<String> blockedNodes = new ConcurrentHashSet<>();
+
+        // Exclude one of the nodes from data(2).
+        int node0IndexInFollowers = followerNodes.indexOf(node0.name());
+        // Make sure node 0 is not stopped. If node0IndexInFollowers==-1, then 
node0 is the leader.
+        blockedNodes.add(followerNodes.remove(node0IndexInFollowers == -1 ? 0 
: node0IndexInFollowers));
+        logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+        blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg, 
tablePartitionId, blockedNodes));
+
+        // Write data(2) to 6 nodes.
+        errors = insertValues(table, partId, 10);
+        assertThat(errors, is(empty()));
+
+        // Exclude one more node from data replication.
+        blockedNodes.add(followerNodes.remove(0));
+        logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+        // Write data(3) to 5 nodes.
+        errors = insertValues(table, partId, 20);
+        assertThat(errors, is(empty()));
+
+        // Disable scale down.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        // Now followerNodes has 4 elements - without the leader and two 
blocked nodes. Stop them all.
+        int[] nodesToStop = followerNodes.stream()
+                .mapToInt(this::nodeIndex)
+                .toArray();
+        logger().info("Stopping nodes [id={}]", Arrays.toString(nodesToStop));
+        stopNodesInParallel(nodesToStop);
+
+        // One of them has the most up to date data, the others fall behind.
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.READ_ONLY);
+
+        // Collect nodes that will be the part of the planned assignments.
+        // These are the leader and two blocked nodes.
+        List<String> nodesNamesForFinalAssignments = new 
ArrayList<>(blockedNodes);
+        nodesNamesForFinalAssignments.add(leaderName);
+
+        // Unblock raft.
+        blockedNodes.clear();
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true
+        );
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        // Pending is the one with the most up to date log index.
+        Assignments assignmentPending = 
Assignments.forced(Set.of(Assignment.forPeer(leaderName)), timestamp);
+
+        assertPendingAssignments(node0, partId, assignmentPending);
+
+        Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
+                .map(Assignment::forPeer)
+                .collect(Collectors.toSet());
+
+        Assignments assignmentsPlanned = Assignments.of(peers, timestamp);
+
+        assertPlannedAssignments(node0, partId, assignmentsPlanned);
+
+        // Wait for the new stable assignments to take effect.
+        executeSql(format("ALTER ZONE %s SET replicas=%d", zoneName, 3));
+
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.AVAILABLE);
+
+        // Make sure the data is present.
+        IntStream.range(0, ENTRIES).forEach(i -> {
+            CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null, 
Tuple.create(of("id", i)));
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertEquals(Tuple.create(of("val", i + 20)), fut.join());
+        });
+    }
+
+    /**
+     * The test creates a group of 7 nodes, and performs writes in two steps:
+     * <ol>
+     * <li>Write data to all nodes.</li>
+     * <li>Block raft updates on one of the nodes. Write second portion of 
data to the group.</li>
+     * </ol>
+     *
+     * <p>As a result, we'll have one node having only data(1) and the other 6 
nodes having
+     * data(2).
+     * Then we stop 4 nodes with data(2) - effectively the majority of 7 
nodes, and call resetPartitions.
+     * The two phase reset should pick the node with the highest raft log 
index as the single node for the pending assignments.
+     */
+    @Test
+    @ZoneParams(nodes = 7, replicas = 7, partitions = 1)
+    void testThoPhaseResetEqualLogIndex() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+        Table table = node0.tables().table(TABLE_NAME);
+
+        awaitPrimaryReplica(node0, partId);
+
+        Set<String> clusterNodeNames = Set.of(
+                node(0).name(),
+                node(1).name(),
+                node(2).name(),
+                node(3).name(),
+                node(4).name(),
+                node(5).name(),
+                node(6).name());
+        assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
+
+        Assignments allAssignments = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(2).name()),
+                Assignment.forPeer(node(3).name()),
+                Assignment.forPeer(node(4).name()),
+                Assignment.forPeer(node(5).name()),
+                Assignment.forPeer(node(6).name())
+        ), timestamp);
+
+        assertStableAssignments(node0, partId, allAssignments);
+        // Write data(1) to all seven nodes.
+        List<Throwable> errors = insertValues(table, partId, 0);
+        assertThat(errors, is(empty()));
+
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
+
+        // We filter out the leader to be able to reliably block raft state 
transfer on the other nodes.
+        String leaderName = findLeader(1, partId);
+        logger().info("Raft group leader is [id={}]", leaderName);
+
+        // All the nodes except the leader - 6 nodes.
+        List<String> followerNodes = new ArrayList<>(clusterNodeNames);
+        followerNodes.remove(leaderName);
+
+        // The nodes that we block AppendEntriesRequest to.
+        Set<String> blockedNodes = new ConcurrentHashSet<>();
+
+        // Exclude one of the nodes from data(2).
+        int node0IndexInFollowers = followerNodes.indexOf(node0.name());
+        // Make sure node 0 is not stopped. If node0IndexInFollowers==-1, then 
node0 is the leader.
+        String blockedNode = followerNodes.remove(node0IndexInFollowers == -1 
? 0 : node0IndexInFollowers);
+        blockedNodes.add(blockedNode);
+        logger().info("Blocking updates on nodes [ids={}]", blockedNodes);
+
+        blockMessage((nodeName, msg) -> dataReplicateMessage(nodeName, msg, 
tablePartitionId, blockedNodes));
+
+        // Write data(2) to 6 nodes.
+        errors = insertValues(table, partId, 10);
+        assertThat(errors, is(empty()));
+
+        // Disable scale down.
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", zoneName, INFINITE_TIMER_VALUE));
+
+        List<String> nodeNamesToStop = new ArrayList<>(followerNodes);
+        // Make sure there are 4 elements in this list.
+        nodeNamesToStop.remove(0);
+        // Stop them all.
+        int[] nodesToStop = nodeNamesToStop.stream()
+                .mapToInt(this::nodeIndex)
+                .toArray();
+        logger().info("Stopping nodes [id={}]", Arrays.toString(nodesToStop));
+        stopNodesInParallel(nodesToStop);
+
+        // Unblock raft.
+        blockedNodes.clear();
+
+        // Two nodes should have the most up to date data, one falls behind.
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.READ_ONLY);
+
+        // Collect nodes that will be the part of the planned assignments.
+        List<String> nodesNamesForFinalAssignments = new 
ArrayList<>(clusterNodeNames);
+        nodesNamesForFinalAssignments.removeAll(nodeNamesToStop);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                false
+        );
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        String pendingNodeName = 
getPendingNodeName(nodesNamesForFinalAssignments, blockedNode);
+
+        // Pending is the one with the most up to date log index.
+        Assignments assignmentPending = 
Assignments.forced(Set.of(Assignment.forPeer(pendingNodeName)), timestamp);
+
+        assertPendingAssignments(node0, partId, assignmentPending);
+
+        Set<Assignment> peers = nodesNamesForFinalAssignments.stream()
+                .map(Assignment::forPeer)
+                .collect(Collectors.toSet());
+
+        Assignments assignmentsPlanned = Assignments.of(peers, timestamp);
+
+        assertPlannedAssignments(node0, partId, assignmentsPlanned);
+
+        // Wait for the new stable assignments to take effect.
+        executeSql(format("ALTER ZONE %s SET replicas=%d", zoneName, 3));
+
+        waitForPartitionState(node0, partId, 
GlobalPartitionStateEnum.AVAILABLE);
+
+        // Make sure the data is present.
+        IntStream.range(0, ENTRIES).forEach(i -> {
+            CompletableFuture<Tuple> fut = table.keyValueView().getAsync(null, 
Tuple.create(of("id", i)));
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertEquals(Tuple.create(of("val", i + 10)), fut.join());
+        });
+    }
+
+    @Test
+    @ZoneParams(nodes = 6, replicas = 3, partitions = 1)
+    void testTwoPhaseResetOnEmptyNodes() throws Exception {
+        int partId = 0;
+
+        IgniteImpl node0 = unwrapIgniteImpl(cluster.node(0));
+        int catalogVersion = node0.catalogManager().latestCatalogVersion();
+        long timestamp = node0.catalogManager().catalog(catalogVersion).time();
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 2, 3, 5);
+
+        Assignments blockedRebalance = Assignments.of(timestamp,
+                Assignment.forPeer(node(0).name())
+        );
+
+        blockRebalanceStableSwitch(partId, blockedRebalance);
+
+        stopNodesInParallel(2, 3, 5);
+
+        waitForScale(node0, 3);
+
+        CompletableFuture<?> updateFuture = 
node0.disasterRecoveryManager().resetAllPartitions(
+                zoneName,
+                QUALIFIED_TABLE_NAME,
+                true
+        );
+
+        assertThat(updateFuture, willCompleteSuccessfully());
+
+        awaitPrimaryReplica(node0, partId);
+
+        assertRealAssignments(node0, partId, 0, 1, 4);
+
+        Assignments assignmentForced1 = 
Assignments.forced(Set.of(Assignment.forPeer(node(0).name())), timestamp);
+
+        assertPendingAssignments(node0, partId, assignmentForced1);
+
+        Assignments assignments13 = Assignments.of(Set.of(
+                Assignment.forPeer(node(0).name()),
+                Assignment.forPeer(node(1).name()),
+                Assignment.forPeer(node(4).name())
+        ), timestamp);
+
+        assertPlannedAssignments(node0, partId, assignments13);
+    }
+
+    private static String getPendingNodeName(List<String> aliveNodes, String 
blockedNode) {
+        List<String> candidates = new ArrayList<>(aliveNodes);
+        candidates.remove(blockedNode);
+
+        // Without the blocking node, the other two should have the same raft 
log index.
+        // Pick the one with the name
+        candidates.sort(String::compareTo);
+
+        return candidates.get(0);
+    }
+
+    private void blockMessage(BiPredicate<String, NetworkMessage> predicate) {
         
cluster.runningNodes().map(TestWrappers::unwrapIgniteImpl).forEach(node -> {
-            BiPredicate<String, NetworkMessage> newPredicate = (nodeName, msg) 
-> stableKeySwitchMessage(msg, partId, assignment);
             BiPredicate<String, NetworkMessage> oldPredicate = 
node.dropMessagesPredicate();
 
             if (oldPredicate == null) {
-                node.dropMessages(newPredicate);
+                node.dropMessages(predicate);
             } else {
-                node.dropMessages(oldPredicate.or(newPredicate));
+                node.dropMessages(oldPredicate.or(predicate));
             }
         });
     }
 
+    private static boolean dataReplicateMessage(
+            String nodeName,
+            NetworkMessage msg,
+            TablePartitionId tablePartitionId,
+            Set<String> blockedNodes
+    ) {
+        if (msg instanceof AppendEntriesRequest) {
+            var appendEntriesRequest = (AppendEntriesRequest) msg;
+            if 
(tablePartitionId.toString().equals(appendEntriesRequest.groupId())) {
+                return blockedNodes.contains(nodeName);
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Block rebalance, so stable won't be switched to specified pending.
+     */
+    private void blockRebalanceStableSwitch(int partId, Assignments 
assignment) {
+        blockMessage((nodeName, msg) -> stableKeySwitchMessage(msg, partId, 
assignment));
+    }
+
     private boolean stableKeySwitchMessage(NetworkMessage msg, int partId, 
Assignments blockedAssignments) {
         if (msg instanceof WriteActionRequest) {
             var writeActionRequest = (WriteActionRequest) msg;
@@ -624,7 +988,7 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         return false;
     }
 
-    private void waitForPartitionState(IgniteImpl node0, 
GlobalPartitionStateEnum expectedState) throws InterruptedException {
+    private void waitForPartitionState(IgniteImpl node0, int partId, 
GlobalPartitionStateEnum expectedState) throws InterruptedException {
         assertTrue(waitForCondition(() -> {
             CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> 
statesFuture = node0.disasterRecoveryManager()
                     .globalPartitionStates(Set.of(zoneName), emptySet());
@@ -633,15 +997,26 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
             Map<TablePartitionId, GlobalPartitionState> map = 
statesFuture.join();
 
-            GlobalPartitionState state = map.values().iterator().next();
+            GlobalPartitionState state = map.get(new TablePartitionId(tableId, 
partId));
 
-            return state.state == expectedState;
+            return state != null && state.state == expectedState;
         }, 500, 20_000),
                 () -> "Expected state: " + expectedState
                         + ", actual: " + 
node0.disasterRecoveryManager().globalPartitionStates(Set.of(zoneName), 
emptySet()).join()
         );
     }
 
+    private String findLeader(int nodeIdx, int partId) {
+        IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));
+
+        var raftNodeId = new RaftNodeId(new TablePartitionId(tableId, partId), 
new Peer(node.name()));
+        var jraftServer = (JraftServerImpl) node.raftManager().server();
+
+        RaftGroupService raftGroupService = 
jraftServer.raftGroupService(raftNodeId);
+        assertNotNull(raftGroupService);
+        return raftGroupService.getRaftNode().getLeaderId().getConsistentId();
+    }
+
     private void triggerRaftSnapshot(int nodeIdx, int partId) throws 
InterruptedException, ExecutionException {
         //noinspection resource
         IgniteImpl node = unwrapIgniteImpl(node(nodeIdx));
@@ -680,6 +1055,20 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         );
     }
 
+    private void assertPlannedAssignments(IgniteImpl node0, int partId, 
Assignments expected) throws InterruptedException {
+        assertTrue(
+                waitForCondition(() -> 
expected.equals(getPlannedAssignments(node0, partId)), 2000),
+                () -> "Expected: " + expected + ", actual: " + 
getPlannedAssignments(node0, partId)
+        );
+    }
+
+    private void assertStableAssignments(IgniteImpl node0, int partId, 
Assignments expected) throws InterruptedException {
+        assertTrue(
+                waitForCondition(() -> 
expected.equals(getStableAssignments(node0, partId)), 2000),
+                () -> "Expected: " + expected + ", actual: " + 
getStableAssignments(node0, partId)
+        );
+    }
+
     /**
      * Inserts {@value ENTRIES} values into a table, expecting either a 
success or specific set of exceptions that would indicate
      * replication issues. Collects such exceptions into a list and returns. 
Fails if unexpected exception happened.
@@ -776,8 +1165,19 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
                 .collect(Collectors.toList());
     }
 
-    private @Nullable Assignments getPendingAssignments(IgniteImpl node0, int 
partId) {
-        CompletableFuture<Entry> pendingFut = node0.metaStorageManager()
+    private @Nullable Assignments getPlannedAssignments(IgniteImpl node, int 
partId) {
+        CompletableFuture<Entry> plannedFut = node.metaStorageManager()
+                .get(plannedPartAssignmentsKey(new TablePartitionId(tableId, 
partId)));
+
+        assertThat(plannedFut, willCompleteSuccessfully());
+
+        Entry planned = plannedFut.join();
+
+        return planned.empty() ? null : Assignments.fromBytes(planned.value());
+    }
+
+    private @Nullable Assignments getPendingAssignments(IgniteImpl node, int 
partId) {
+        CompletableFuture<Entry> pendingFut = node.metaStorageManager()
                 .get(pendingPartAssignmentsKey(new TablePartitionId(tableId, 
partId)));
 
         assertThat(pendingFut, willCompleteSuccessfully());
@@ -787,8 +1187,8 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
         return pending.empty() ? null : Assignments.fromBytes(pending.value());
     }
 
-    private @Nullable Assignments getStableAssignments(IgniteImpl node0, int 
partId) {
-        CompletableFuture<Entry> stableFut = node0.metaStorageManager()
+    private @Nullable Assignments getStableAssignments(IgniteImpl node, int 
partId) {
+        CompletableFuture<Entry> stableFut = node.metaStorageManager()
                 .get(stablePartAssignmentsKey(new TablePartitionId(tableId, 
partId)));
 
         assertThat(stableFut, willCompleteSuccessfully());
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
index 74994de8c7..4b99b1ea8f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table.distributed.disaster;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.stream.Collectors.groupingBy;
@@ -241,6 +242,21 @@ public class DisasterRecoveryManager implements 
IgniteComponent, SystemViewProvi
         );
     }
 
+    /**
+     * Updates assignments of the table in a forced manner, allowing for the 
recovery of raft group with lost majorities. It is achieved via
+     * triggering a new rebalance with {@code force} flag enabled in {@link 
Assignments} for partitions where it's required. New pending
+     * assignments with {@code force} flag remove old stable nodes from the 
distribution, and force new Raft configuration via "resetPeers"
+     * so that a new leader could be elected.
+     *
+     * @param zoneName Name of the distribution zone. Case-sensitive, without 
quotes.
+     * @param tableName Fully-qualified table name. Case-sensitive, without 
quotes. Example: "PUBLIC.Foo".
+     * @param manualUpdate Whether the update is triggered manually by user or 
automatically by core logic.
+     * @return Future that completes when partitions are reset.
+     */
+    public CompletableFuture<Void> resetAllPartitions(String zoneName, String 
tableName, boolean manualUpdate) {
+        return resetPartitions(zoneName, tableName, emptySet(), manualUpdate);
+    }
+
     /**
      * Updates assignments of the table in a forced manner, allowing for the 
recovery of raft group with lost majorities. It is achieved via
      * triggering a new rebalance with {@code force} flag enabled in {@link 
Assignments} for partitions where it's required. New pending
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
index 2abaffaf99..f97386ee8a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GroupUpdateRequest.java
@@ -28,7 +28,7 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUt
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.tableStableAssignments;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
@@ -44,22 +44,26 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
 import static 
org.apache.ignite.lang.ErrorGroups.DisasterRecovery.CLUSTER_NOT_IDLE_ERR;
 
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.distributionzones.NodeWithAttributes;
+import org.apache.ignite.internal.distributionzones.rebalance.AssignmentUtil;
 import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
 import 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.UpdateStatus;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.dsl.Iif;
 import org.apache.ignite.internal.metastorage.dsl.StatementResult;
@@ -74,6 +78,8 @@ import org.apache.ignite.internal.util.CollectionUtils;
 import org.jetbrains.annotations.Nullable;
 
 class GroupUpdateRequest implements DisasterRecoveryRequest {
+    private static final IgniteLogger LOG = 
Loggers.forClass(GroupUpdateRequest.class);
+
     private final UUID operationId;
 
     /**
@@ -155,7 +161,9 @@ class GroupUpdateRequest implements DisasterRecoveryRequest 
{
                     .map(NodeWithAttributes::nodeName)
                     .collect(toSet());
 
-            CompletableFuture<?>[] futures = forceAssignmentsUpdate(
+            int[] partitionIdsArray = 
AssignmentUtil.partitionIds(partitionIds, zoneDescriptor.partitions());
+
+            return forceAssignmentsUpdate(
                     tableDescriptor,
                     zoneDescriptor,
                     dataNodes,
@@ -163,11 +171,18 @@ class GroupUpdateRequest implements 
DisasterRecoveryRequest {
                     msRevision,
                     disasterRecoveryManager.metaStorageManager,
                     localStatesMap,
-                    catalog.time()
+                    catalog.time(),
+                    partitionIdsArray,
+                    manualUpdate
             );
-
-            return allOf(futures);
-        }).thenCompose(Function.identity());
+        })
+        .thenCompose(Function.identity())
+        .whenComplete((unused, throwable) -> {
+            // TODO: IGNITE-23635 Add fail handling for failed resetPeers
+            if (throwable != null) {
+                LOG.error("Failed to reset partition", throwable);
+            }
+        });
     }
 
     /**
@@ -181,10 +196,11 @@ class GroupUpdateRequest implements 
DisasterRecoveryRequest {
      * @param aliveNodesConsistentIds Set of alive nodes according to logical 
topology.
      * @param revision Meta-storage revision to be associated with 
reassignment.
      * @param metaStorageManager Meta-storage manager.
-     * @param localStatesMap Local partition states retrieved by {@link 
DisasterRecoveryManager#localPartitionStates(Set, Set, Set)}.
+     * @param localStatesMap Local partition states retrieved by
+     *         {@link DisasterRecoveryManager#localPartitionStates(Set, Set, 
Set)}.
      * @return A future that will be completed when reassignments data is 
written into a meta-storage, if that's required.
      */
-    private CompletableFuture<?>[] forceAssignmentsUpdate(
+    private static CompletableFuture<Void> forceAssignmentsUpdate(
             CatalogTableDescriptor tableDescriptor,
             CatalogZoneDescriptor zoneDescriptor,
             Set<String> dataNodes,
@@ -192,47 +208,71 @@ class GroupUpdateRequest implements 
DisasterRecoveryRequest {
             long revision,
             MetaStorageManager metaStorageManager,
             Map<TablePartitionId, LocalPartitionStateMessageByNode> 
localStatesMap,
-            long assignmentsTimestamp
+            long assignmentsTimestamp,
+            int[] partitionIds,
+            boolean manualUpdate
     ) {
-        CompletableFuture<Map<Integer, Assignments>> tableAssignmentsFut = 
tableAssignments(
-                metaStorageManager,
-                tableDescriptor.id(),
-                partitionIds,
-                zoneDescriptor.partitions()
-        );
-
-        Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
-
-        int[] partitionIdsArray = partitionIds.isEmpty()
-                ? IntStream.range(0, zoneDescriptor.partitions()).toArray()
-                : partitionIds.stream().mapToInt(Integer::intValue).toArray();
-
-        CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIdsArray.length];
-
-        for (int partitionId = 0; partitionId < partitionIdsArray.length; 
partitionId++) {
-            TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partitionIdsArray[partitionId]);
+        return tableStableAssignments(metaStorageManager, 
tableDescriptor.id(), partitionIds)
+                .thenCompose(tableAssignments -> {
+                    if (tableAssignments.isEmpty()) {
+                        return nullCompletedFuture();
+                    }
 
-            futures[partitionId] = 
tableAssignmentsFut.thenCompose(tableAssignments ->
-                    tableAssignments.isEmpty() ? nullCompletedFuture() : 
partitionUpdate(
-                            replicaGrpId,
-                            aliveDataNodes,
+                    return updateAssignments(
+                            tableDescriptor,
+                            zoneDescriptor,
+                            dataNodes,
                             aliveNodesConsistentIds,
-                            zoneDescriptor.replicas(),
                             revision,
                             metaStorageManager,
-                            
tableAssignments.get(replicaGrpId.partitionId()).nodes(),
-                            localStatesMap.get(replicaGrpId),
+                            localStatesMap,
                             assignmentsTimestamp,
-                            this.manualUpdate
-                    )).thenAccept(res -> {
-                        DisasterRecoveryManager.LOG.info(
-                                "Partition {} returned {} status on reset 
attempt", replicaGrpId, UpdateStatus.valueOf(res)
-                        );
-                    }
-            );
+                            partitionIds,
+                            tableAssignments,
+                            manualUpdate
+                    );
+                });
+    }
+
+    private static CompletableFuture<Void> updateAssignments(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            Set<String> dataNodes,
+            Set<String> aliveNodesConsistentIds,
+            long revision,
+            MetaStorageManager metaStorageManager,
+            Map<TablePartitionId, LocalPartitionStateMessageByNode> 
localStatesMap,
+            long assignmentsTimestamp,
+            int[] partitionIds,
+            Map<Integer, Assignments> tableAssignments,
+            boolean manualUpdate
+    ) {
+        Set<String> aliveDataNodes = CollectionUtils.intersect(dataNodes, 
aliveNodesConsistentIds);
+
+        CompletableFuture<?>[] futures = new 
CompletableFuture[partitionIds.length];
+
+        for (int i = 0; i < partitionIds.length; i++) {
+            TablePartitionId replicaGrpId = new 
TablePartitionId(tableDescriptor.id(), partitionIds[i]);
+
+            futures[i] = partitionUpdate(
+                    replicaGrpId,
+                    aliveDataNodes,
+                    aliveNodesConsistentIds,
+                    zoneDescriptor.replicas(),
+                    revision,
+                    metaStorageManager,
+                    tableAssignments.get(replicaGrpId.partitionId()).nodes(),
+                    localStatesMap.get(replicaGrpId),
+                    assignmentsTimestamp,
+                    manualUpdate
+            ).thenAccept(res -> {
+                DisasterRecoveryManager.LOG.info(
+                        "Partition {} returned {} status on reset attempt", 
replicaGrpId, UpdateStatus.valueOf(res)
+                );
+            });
         }
 
-        return futures;
+        return allOf(futures);
     }
 
     private static CompletableFuture<Integer> partitionUpdate(
@@ -254,44 +294,53 @@ class GroupUpdateRequest implements 
DisasterRecoveryRequest {
             return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
         }
 
-        Iif invokeClosure;
-
-        if (aliveStableNodes.isEmpty()) {
-            if (!manualUpdate) {
-                return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
-            }
+        if (aliveStableNodes.isEmpty() && !manualUpdate) {
+            return completedFuture(ASSIGNMENT_NOT_UPDATED.ordinal());
+        }
 
+        if (manualUpdate) {
             enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
+        }
 
-            // There are no known nodes with data, which means that we can 
just put new assignments into pending assignments with "forced"
-            // flag.
-            invokeClosure = prepareMsInvokeClosure(
-                    partId,
-                    longToBytesKeepingOrder(revision),
-                    Assignments.forced(partAssignments, 
assignmentsTimestamp).toBytes(),
-                    null
-            );
-        } else {
-            Set<Assignment> stableAssignments = Set.copyOf(partAssignments);
-
-            if (manualUpdate) {
-                enrichAssignments(partId, aliveDataNodes, replicas, 
partAssignments);
-            }
+        Assignment nextAssignment = 
nextAssignment(localPartitionStateMessageByNode, partAssignments);
 
-            // There are nodes with data, and we set pending assignments to 
this set of nodes. It'll be the source of peers for
-            // "resetPeers", and after that new assignments with restored 
replica factor wil be picked up from planned assignments
-            // for the case of the manual update, that was triggered by a user.
-            invokeClosure = prepareMsInvokeClosure(
-                    partId,
-                    longToBytesKeepingOrder(revision),
-                    Assignments.forced(stableAssignments, 
assignmentsTimestamp).toBytes(),
-                    Assignments.toBytes(partAssignments, assignmentsTimestamp)
-            );
-        }
+        // There are nodes with data, and we set pending assignments to this 
set of nodes. It'll be the source of peers for
+        // "resetPeers", and after that new assignments with restored replica 
factor wil be picked up from planned assignments
+        // for the case of the manual update, that was triggered by a user.
+        Iif invokeClosure = prepareMsInvokeClosure(
+                partId,
+                longToBytesKeepingOrder(revision),
+                Assignments.forced(Set.of(nextAssignment), 
assignmentsTimestamp).toBytes(),
+                Assignments.toBytes(partAssignments, assignmentsTimestamp)
+        );
 
         return 
metaStorageMgr.invoke(invokeClosure).thenApply(StatementResult::getAsInt);
     }
 
+    /**
+     * Returns an assignment with the most up to date log index, if there are 
more than one node with the same index,
+     * returns the first one in the lexicographic order.
+     */
+    private static Assignment nextAssignment(
+            LocalPartitionStateMessageByNode localPartitionStateByNode,
+            Set<Assignment> assignments
+    ) {
+        // For nodes that we know log index for (having any data), we choose 
the node with the highest log index.
+        // If there are more than one node with same log index, we choose the 
one with the first consistent id in the lexicographic order.
+        Optional<Assignment> nodeWithMaxLogIndex = assignments.stream()
+                .filter(assignment -> 
localPartitionStateByNode.partitionState(assignment.consistentId()) != null)
+                .min(Comparator.<Assignment>comparingLong(
+                                node -> 
localPartitionStateByNode.partitionState(node.consistentId()).logIndex()
+                        )
+                        .reversed()
+                        .thenComparing(Assignment::consistentId))
+                // If there are no nodes with data, we choose the node with 
the first consistent id in the lexicographic order.
+                .or(() -> 
assignments.stream().min(Comparator.comparing(Assignment::consistentId)));
+
+        // TODO: IGNITE-23737 The case with no suitable nodes should be 
handled.
+        return nodeWithMaxLogIndex.orElseThrow();
+    }
+
     /**
      * Creates an {@link Iif} instance for meta-storage's {@link 
MetaStorageManager#invoke(Iif)} call. Does the following:
      * <ul>
@@ -371,8 +420,7 @@ class GroupUpdateRequest implements DisasterRecoveryRequest 
{
             int replicas,
             Set<Assignment> partAssignments
     ) {
-        Set<Assignment> calcAssignments = 
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(),
-                replicas);
+        Set<Assignment> calcAssignments = 
calculateAssignmentForPartition(aliveDataNodes, partId.partitionId(), replicas);
 
         for (Assignment calcAssignment : calcAssignments) {
             if (partAssignments.size() == replicas) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
index 4e0fc5f228..b1ba02cad3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateMessageByNode.java
@@ -51,6 +51,15 @@ public class LocalPartitionStateMessageByNode {
         map.put(nodeName, state);
     }
 
+    /**
+     * Returns node state mapping.
+     *
+     * @param node Consistent ID of the node.
+     */
+    public LocalPartitionStateMessage partitionState(String node) {
+        return map.get(node);
+    }
+
     @Override
     public String toString() {
         return map.toString();

Reply via email to