This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fc37b6e48 IGNITE-23215 Implement MetastorageRepair (#4403)
0fc37b6e48 is described below

commit 0fc37b6e48d7a2bf0ed6c8d02cce744e407ec2ef
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Sep 18 14:44:15 2024 +0400

    IGNITE-23215 Implement MetastorageRepair (#4403)
---
 .../management/ClusterManagementGroupManager.java  |  18 ++
 .../cluster/management/raft/CmgRaftService.java    |  13 ++
 .../impl/ItMetaStorageMaintenanceTest.java         |  15 +-
 .../metastorage/impl/MetaStorageManagerImpl.java   |   7 +-
 .../impl/MetastorageGroupMaintenance.java          |   8 +-
 .../apache/ignite/internal/raft/IndexWithTerm.java |  41 +++-
 .../internal/raft/service/LeaderWithTerm.java      |   6 +
 .../ignite/internal/raft/IndexWithTermTest.java}   |  32 ++--
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |   8 +-
 modules/system-disaster-recovery-api/build.gradle  |   1 +
 .../message/BecomeMetastorageLeaderMessage.java    |   8 +-
 .../MetastorageRepair.java}                        |  21 ++-
 .../disaster/system/MetastorageRepairImpl.java     | 192 +++++++++++++++++++
 .../system/SystemDisasterRecoveryManagerImpl.java  |  25 ++-
 .../disaster/system/MetastorageRepairImplTest.java | 209 +++++++++++++++++++++
 .../SystemDisasterRecoveryManagerImplTest.java     |  29 +--
 16 files changed, 572 insertions(+), 61 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index a456fe8bf3..08ca0d36c9 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -1057,6 +1057,24 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
         }
     }
 
+    /**
+     * Changes metastorage nodes in the CMG.
+     *
+     * @return Future that completes when the command is executed by the CMG.
+     */
+    public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes) {
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        try {
+            return raftServiceAfterJoin()
+                    .thenCompose(service -> 
service.changeMetastorageNodes(newMetastorageNodes));
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
     /**
      * Returns a future resolving to the initial cluster configuration in 
HOCON format. The resulting configuration may be {@code null} if
      * not provided by the user.
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index b89322943a..b41948903c 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.ClusterTag;
 import org.apache.ignite.internal.cluster.management.NodeAttributes;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import 
org.apache.ignite.internal.cluster.management.raft.commands.ChangeMetastorageNodesCommand;
 import 
org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
 import 
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
 import 
org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
@@ -324,6 +325,18 @@ public class CmgRaftService implements ManuallyCloseable {
         }
     }
 
+    /**
+     * Changes Metastorage nodes.
+     *
+     * @return Future that completes when the change is finished.
+     */
+    public CompletableFuture<Void> changeMetastorageNodes(Set<String> 
newMetastorageNodes) {
+        ChangeMetastorageNodesCommand command = 
msgFactory.changeMetastorageNodesCommand()
+                .metaStorageNodes(Set.copyOf(newMetastorageNodes))
+                .build();
+        return raftService.run(command);
+    }
+
     @Override
     public void close() {
         raftService.shutdown();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index f7acce025e..6705f543e1 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -58,11 +59,15 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         // Metastorage does not work anymore.
         assertThatMetastorageHasNoMajority(node0);
 
-        assertThat(node0.metaStorageManager.becomeLonelyLeader(true), 
willCompleteSuccessfully());
+        assertThat(node0.metaStorageManager.becomeLonelyLeader(1, 
allNodeNames()), willCompleteSuccessfully());
 
         assertThatMetastorageHasMajority(node0);
     }
 
+    private Set<String> allNodeNames() {
+        return nodes.stream().map(Node::name).collect(toSet());
+    }
+
     /**
      * Starts 3 voting Raft nodes. 'Voting' here is opposed to a 'learner' 
node which does not vote.
      *
@@ -110,7 +115,7 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         // Stop the majority.
         stopAllNodesExcept0();
 
-        assertThat(node0.metaStorageManager.becomeLonelyLeader(true), 
willCompleteSuccessfully());
+        assertThat(node0.metaStorageManager.becomeLonelyLeader(1, 
allNodeNames()), willCompleteSuccessfully());
 
         Node node3 = startNode();
 
@@ -130,7 +135,7 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         // Stop the majority.
         stopAllNodesExcept0();
 
-        assertThat(node0.metaStorageManager.becomeLonelyLeader(false), 
willCompleteSuccessfully());
+        assertThat(node0.metaStorageManager.becomeLonelyLeader(1, 
Set.of(node0.name())), willCompleteSuccessfully());
 
         Node node3 = startNode();
 
@@ -159,7 +164,7 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         // Stop the majority.
         stopAllNodesExcept0();
 
-        assertThat(node0.metaStorageManager.becomeLonelyLeader(true), 
willCompleteSuccessfully());
+        assertThat(node0.metaStorageManager.becomeLonelyLeader(1, 
allNodeNames()), willCompleteSuccessfully());
 
         ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
 
@@ -198,7 +203,7 @@ class ItMetaStorageMaintenanceTest extends 
ItMetaStorageMultipleNodesAbstractTes
         // Stop the majority.
         stopAllNodesExcept0();
 
-        assertThat(node0.metaStorageManager.becomeLonelyLeader(false), 
willCompleteSuccessfully());
+        assertThat(node0.metaStorageManager.becomeLonelyLeader(1, 
Set.of(node0.name())), willCompleteSuccessfully());
 
         ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
         HybridTimestamp timeBeforeOp = clusterTime0.currentSafeTime();
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7660bdf3d6..7688a2fe11 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -157,7 +157,7 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
      */
     private volatile boolean leaderSecondaryDutiesPaused = false;
 
-    /** Protects {@link #becomeLonelyLeader(boolean)} from concurrent 
executions. */
+    /** Protects {@link #becomeLonelyLeader(long, Set)} from concurrent 
executions. */
     private final Object becomeLonelyLeaderMutex = new Object();
 
     /**
@@ -888,10 +888,11 @@ public class MetaStorageManagerImpl implements 
MetaStorageManager, MetastorageGr
     }
 
     @Override
-    public CompletableFuture<Void> becomeLonelyLeader(boolean 
pauseLeaderSecondaryDuties) {
+    public CompletableFuture<Void> becomeLonelyLeader(long termBeforeChange, 
Set<String> targetVotingSet) {
+        // TODO: IGNITE-22899 - use both parameters.
         return inBusyLockAsync(busyLock, () -> {
             synchronized (becomeLonelyLeaderMutex) {
-                leaderSecondaryDutiesPaused = pauseLeaderSecondaryDuties;
+                leaderSecondaryDutiesPaused = targetVotingSet.size() > 1;
 
                 RaftNodeId raftNodeId = raftNodeId();
                 PeersAndLearners newConfiguration = 
PeersAndLearners.fromPeers(Set.of(raftNodeId.peer()), emptySet());
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
index d1acc34e4d..a600b7e4d6 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.raft.IndexWithTerm;
 
@@ -35,8 +36,9 @@ public interface MetastorageGroupMaintenance {
     /**
      * Makes this node a leader of the Metastorage group (with the voting set 
containing of just this node).
      *
-     * @param pauseLeaderSecondaryDuties Whether leader secondary duties 
(managing learners, propagating idle safe time) should be paused.
-     * @return Future which completes when this node becomes a leader.
+     * @param termBeforeChange Term that Metastorage on the target node has 
before we make it become the leader.
+     * @param targetVotingSet Voting set members which we want to achieve 
(becoming a leader is just the first step in doing so).
+     * @return Future which completes when new leader becomes available.
      */
-    CompletableFuture<Void> becomeLonelyLeader(boolean 
pauseLeaderSecondaryDuties);
+    CompletableFuture<Void> becomeLonelyLeader(long termBeforeChange, 
Set<String> targetVotingSet);
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
index b95be4417b..3c40d50546 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.raft;
 /**
  * Raft index with the corresponding term.
  */
-public class IndexWithTerm {
+public class IndexWithTerm implements Comparable<IndexWithTerm> {
     private final long index;
     private final long term;
 
@@ -39,4 +39,43 @@ public class IndexWithTerm {
     public long term() {
         return term;
     }
+
+    @Override
+    public int compareTo(IndexWithTerm that) {
+        int byTerm = Long.compare(this.term, that.term);
+        if (byTerm != 0) {
+            return byTerm;
+        }
+
+        return Long.compare(this.index, that.index);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        IndexWithTerm that = (IndexWithTerm) o;
+
+        if (index != that.index) {
+            return false;
+        }
+        return term == that.term;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (index ^ (index >>> 32));
+        result = 31 * result + (int) (term ^ (term >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return index + ":" + term;
+    }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
index d22ae5e091..450784cf13 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
+++ 
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.raft.service;
 
 import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -53,4 +54,9 @@ public class LeaderWithTerm {
     public long term() {
         return term;
     }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
 }
diff --git 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
 
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
similarity index 52%
copy from 
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
copy to 
modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
index b95be4417b..0e35f4a244 100644
--- 
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
+++ 
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
@@ -17,26 +17,22 @@
 
 package org.apache.ignite.internal.raft;
 
-/**
- * Raft index with the corresponding term.
- */
-public class IndexWithTerm {
-    private final long index;
-    private final long term;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
 
-    /** Constructor. */
-    public IndexWithTerm(long index, long term) {
-        this.index = index;
-        this.term = term;
-    }
+import org.junit.jupiter.api.Test;
 
-    /** Returns the index. */
-    public long index() {
-        return index;
-    }
+class IndexWithTermTest {
+    @Test
+    void compareToComparesByTermThenByIndex() {
+        assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(3, 1)), 
is(greaterThan(0)));
+
+        assertThat(new IndexWithTerm(3, 1).compareTo(new IndexWithTerm(3, 2)), 
is(lessThan(0)));
+        assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(3, 2)), 
is(lessThan(0)));
+        assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(5, 2)), 
is(lessThan(0)));
 
-    /** Returns the term corresponding to the index. */
-    public long term() {
-        return term;
+        assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(5, 1)), 
is(0));
     }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 3bfbae526b..4b7fb58408 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2778,13 +2778,7 @@ public class NodeImpl implements Node, RaftServerService 
{
 
     @Override
     public long lastLogIndex() {
-        this.readLock.lock();
-        try {
-            return logManager.getLastLogIndex();
-        }
-        finally {
-            this.readLock.unlock();
-        }
+        return lastLogIndexAndTerm().getIndex();
     }
 
     @Override
diff --git a/modules/system-disaster-recovery-api/build.gradle 
b/modules/system-disaster-recovery-api/build.gradle
index 4c93ec2153..ac1901adfc 100644
--- a/modules/system-disaster-recovery-api/build.gradle
+++ b/modules/system-disaster-recovery-api/build.gradle
@@ -26,6 +26,7 @@ dependencies {
     implementation project(':ignite-core')
     implementation project(':ignite-network-api')
     implementation project(':ignite-vault')
+    implementation project(':ignite-raft-api')
     implementation libs.jetbrains.annotations
 
     testImplementation libs.hamcrest.core
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
index 3d96e5670f..c89db79c5e 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.disaster.system.message;
 
+import java.util.Set;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
@@ -25,6 +26,9 @@ import 
org.apache.ignite.internal.network.annotations.Transferable;
  */
 @Transferable(SystemDisasterRecoveryMessageGroup.BECOME_METASTORAGE_LEADER)
 public interface BecomeMetastorageLeaderMessage extends NetworkMessage {
-    /** Whether leader secondary duties (managing learners, propagating idle 
safe time) should be paused. */
-    boolean pauseLeaderSecondaryDuties();
+    /** Term that Metastorage on the target node has before we make it become 
the leader. */
+    long termBeforeChange();
+
+    /** Voting set members (aka peers) which we want to achieve (becoming a 
leader is just the first step in doing so). */
+    Set<String> targetVotingSet();
 }
diff --git 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
similarity index 56%
copy from 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
copy to 
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
index 3d96e5670f..8abab1b601 100644
--- 
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
+++ 
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
@@ -15,16 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.disaster.system.message;
+package org.apache.ignite.internal.disaster.system.repair;
 
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * A command to make a node become a Metastorage Raft group leader.
+ * Metastorage repair logic.
  */
-@Transferable(SystemDisasterRecoveryMessageGroup.BECOME_METASTORAGE_LEADER)
-public interface BecomeMetastorageLeaderMessage extends NetworkMessage {
-    /** Whether leader secondary duties (managing learners, propagating idle 
safe time) should be paused. */
-    boolean pauseLeaderSecondaryDuties();
+public interface MetastorageRepair {
+    /**
+     * Performs Metastorage repair.
+     *
+     * @param participatingNodeNames Names of the nodes that participate in 
the repair.
+     * @param metastorageReplicationFactor Replication factor for Metastorage 
requested by the user.
+     * @return Future that gets completed when repair is initiated.
+     */
+    CompletableFuture<Void> repair(Set<String> participatingNodeNames, int 
metastorageReplicationFactor);
 }
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
new file mode 100644
index 0000000000..9d4b13e48d
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+    private static final IgniteLogger LOG = 
Loggers.forClass(MetastorageRepairImpl.class);
+
+    /** Number of seconds to wait for nodes participating in a repair to 
appear. */
+    private static final long WAIT_FOR_NODES_SECONDS = 60;
+
+    private final MessagingService messagingService;
+    private final LogicalTopology logicalTopology;
+    private final ClusterManagementGroupManager cmgManager;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+
+    /** Constructor. */
+    public MetastorageRepairImpl(
+            MessagingService messagingService,
+            LogicalTopology logicalTopology,
+            ClusterManagementGroupManager cmgManager
+    ) {
+        this.messagingService = messagingService;
+        this.logicalTopology = logicalTopology;
+        this.cmgManager = cmgManager;
+    }
+
+    @Override
+    public CompletableFuture<Void> repair(Set<String> participatingNodeNames, 
int metastorageReplicationFactor) {
+        LOG.info("Starting MG repair [participatingNodes={}, 
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+        return waitTillValidatedNodesContain(participatingNodeNames)
+                .thenCompose(unused -> 
collectMetastorageIndexes(participatingNodeNames))
+                .thenCompose(indexes -> {
+                    LOG.info("Collected metastorage indexes [indexes={}].", 
indexes);
+
+                    Set<String> newMgNodes = nodesWithBestIndexes(indexes, 
metastorageReplicationFactor);
+                    LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+                    String bestNodeName = chooseNodeWithBestIndex(indexes, 
newMgNodes);
+                    LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+                    return cmgManager.changeMetastorageNodes(newMgNodes)
+                            .thenCompose(unused -> appointLeader(bestNodeName, 
indexes.get(bestNodeName).term(), newMgNodes))
+                            .thenRun(() -> LOG.info("Appointed MG leader 
forcefully [leader={}].", bestNodeName));
+                });
+    }
+
+    private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String> 
nodeNames) {
+        Set<String> cumulativeValidatedNodeNames = 
ConcurrentHashMap.newKeySet();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        LogicalTopologyEventListener listener = new 
LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(LogicalNode validatedNode) {
+                cumulativeValidatedNodeNames.add(validatedNode.name());
+
+                if (isSuperset(cumulativeValidatedNodeNames, nodeNames)) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onNodeInvalidated(LogicalNode invalidatedNode) {
+                cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+            }
+        };
+
+        logicalTopology.addEventListener(listener);
+
+        cmgManager.validatedNodes()
+                .thenAccept(validatedNodes -> {
+                    Set<String> validatedNodeNames = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .collect(toSet());
+                    if (isSuperset(validatedNodeNames, nodeNames)) {
+                        future.complete(null);
+                    }
+
+                    cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+                });
+
+        return future
+                .orTimeout(WAIT_FOR_NODES_SECONDS, SECONDS)
+                .whenComplete((res, ex) -> {
+                    logicalTopology.removeEventListener(listener);
+
+                    if (ex instanceof TimeoutException) {
+                        LOG.error("Did not see all participating nodes online 
in time, failing Metastorage repair, please try again", ex);
+                    }
+                });
+    }
+
+    private static boolean isSuperset(Set<String> container, Set<String> 
containee) {
+        return difference(containee, container).isEmpty();
+    }
+
+    private CompletableFuture<Map<String, IndexWithTerm>> 
collectMetastorageIndexes(Set<String> participatingNodeNames) {
+        MetastorageIndexTermRequestMessage request = 
messagesFactory.metastorageIndexTermRequestMessage().build();
+
+        Map<String, CompletableFuture<MetastorageIndexTermResponseMessage>> 
responses = new HashMap<>();
+
+        for (String nodeName : participatingNodeNames) {
+            responses.put(
+                    nodeName,
+                    messagingService.invoke(nodeName, request, 
10_000).thenApply(MetastorageIndexTermResponseMessage.class::cast)
+            );
+        }
+
+        return allOf(responses.values()).thenApply(unused -> {
+            return responses.entrySet().stream()
+                    .collect(toMap(Entry::getKey, entry -> 
indexWithTerm(entry.getValue().join())));
+        });
+    }
+
+    private static IndexWithTerm 
indexWithTerm(MetastorageIndexTermResponseMessage message) {
+        return new IndexWithTerm(message.raftIndex(), message.raftTerm());
+    }
+
+    private static Set<String> nodesWithBestIndexes(Map<String, IndexWithTerm> 
indexes, int metastorageReplicationFactor) {
+        return indexes.entrySet().stream()
+                .sorted(Entry.<String, 
IndexWithTerm>comparingByValue().reversed())
+                .limit(metastorageReplicationFactor)
+                .map(Entry::getKey)
+                .collect(toSet());
+    }
+
+    private static String chooseNodeWithBestIndex(Map<String, IndexWithTerm> 
indexes, Set<String> newMgNodes) {
+        return newMgNodes.stream()
+                .max(Comparator.comparing(indexes::get))
+                .orElseThrow();
+    }
+
+    private CompletableFuture<Void> appointLeader(String bestNodeName, long 
termBeforeChange, Set<String> newMgNodes) {
+        LOG.info("Appointing MG leader forcefully [leader={}].", bestNodeName);
+
+        BecomeMetastorageLeaderMessage request = 
messagesFactory.becomeMetastorageLeaderMessage()
+                .termBeforeChange(termBeforeChange)
+                .targetVotingSet(Set.copyOf(newMgNodes))
+                .build();
+
+        return messagingService.invoke(bestNodeName, request, 10_000)
+                .thenApply(message -> null);
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 1c51615984..ce190d3090 100644
--- 
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++ 
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -46,6 +46,8 @@ import 
org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
 import 
org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
 import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.impl.MetastorageGroupMaintenance;
@@ -61,6 +63,8 @@ import org.jetbrains.annotations.Nullable;
  * Implementation of {@link SystemDisasterRecoveryManager}.
  */
 public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecoveryManager, IgniteComponent {
+    private static final IgniteLogger LOG = 
Loggers.forClass(SystemDisasterRecoveryManagerImpl.class);
+
     private final String thisNodeName;
     private final TopologyService topologyService;
     private final MessagingService messagingService;
@@ -121,7 +125,12 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
                         if (!thisNodeName.equals(sender.name())) {
                             restarter.initiateRestart();
                         }
-                    }, restartExecutor);
+                    }, restartExecutor)
+                    .whenComplete((res, ex) -> {
+                        if (ex != null) {
+                            LOG.error("Error when handling a 
ResetClusterMessage", ex);
+                        }
+                    });
         });
     }
 
@@ -138,12 +147,22 @@ public class SystemDisasterRecoveryManagerImpl implements 
SystemDisasterRecovery
                             .build();
 
                     messagingService.respond(sender, response, correlationId);
+                })
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Error when handling a 
MetastorageIndexTermRequestMessage", ex);
+                    }
                 });
     }
 
     private void 
handleBecomeMetastorageLeaderMessage(BecomeMetastorageLeaderMessage message, 
ClusterNode sender, long correlationId) {
-        
metastorageGroupMaintenance.becomeLonelyLeader(message.pauseLeaderSecondaryDuties())
-                .thenRun(() -> messagingService.respond(sender, 
successResponseMessage(), correlationId));
+        
metastorageGroupMaintenance.becomeLonelyLeader(message.termBeforeChange(), 
message.targetVotingSet())
+                .thenRun(() -> messagingService.respond(sender, 
successResponseMessage(), correlationId))
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Error when handling a 
BecomeMetastorageLeaderMessage", ex);
+                    }
+                });
     }
 
     @Override
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
new file mode 100644
index 0000000000..28e4cad5b9
--- /dev/null
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import 
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import 
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class MetastorageRepairImplTest extends BaseIgniteAbstractTest {
+    @Mock
+    private MessagingService messagingService;
+
+    @Mock
+    private LogicalTopology logicalTopology;
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    @InjectMocks
+    private MetastorageRepairImpl repair;
+
+    @Captor
+    private ArgumentCaptor<Set<String>> mgNodesCaptor;
+
+    @Captor
+    private ArgumentCaptor<BecomeMetastorageLeaderMessage> 
becomeLeaderMessageCaptor;
+
+    private final SystemDisasterRecoveryMessagesFactory messagesFactory = new 
SystemDisasterRecoveryMessagesFactory();
+    private final CmgMessagesFactory cmgMessagesFactory = new 
CmgMessagesFactory();
+
+    private final ClusterNode node1 = new 
ClusterNodeImpl(randomUUID().toString(), "node1", new NetworkAddress("host", 
1001));
+    private final ClusterNode node2 = new 
ClusterNodeImpl(randomUUID().toString(), "node2", new NetworkAddress("host", 
1002));
+    private final ClusterNode node3 = new 
ClusterNodeImpl(randomUUID().toString(), "node3", new NetworkAddress("host", 
1003));
+    private final ClusterNode node4 = new 
ClusterNodeImpl(randomUUID().toString(), "node4", new NetworkAddress("host", 
1004));
+
+    @BeforeEach
+    void configureMocks() {
+        lenient().when(messagingService.invoke(any(String.class), 
any(BecomeMetastorageLeaderMessage.class), anyLong()))
+                
.thenReturn(completedFuture(cmgMessagesFactory.successResponseMessage().build()));
+        lenient().when(cmgManager.changeMetastorageNodes(any()))
+                .thenReturn(nullCompletedFuture());
+    }
+
+    @Test
+    void hangsIfParticipatingNodesNeverAppear() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
+
+        assertThat(repair.repair(Set.of("node1", "node2"), 1), 
willTimeoutIn(100, MILLISECONDS));
+    }
+
+    @Test
+    void repairsWithMgFactor1() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1)));
+
+        willRespondWithIndexAndTerm(node1, 10, 1);
+
+        assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3, 
SECONDS));
+
+        verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+        assertThat(mgNodesCaptor.getValue(), contains(node1.name()));
+
+        verify(messagingService).invoke(eq(node1.name()), 
becomeLeaderMessageCaptor.capture(), anyLong());
+        assertThat(becomeLeaderMessageCaptor.getValue().termBeforeChange(), 
is(1L));
+        assertThat(becomeLeaderMessageCaptor.getValue().targetVotingSet(), 
is(Set.of(node1.name())));
+    }
+
+    private void willRespondWithIndexAndTerm(ClusterNode node, int raftIndex, 
int raftTerm) {
+        when(messagingService.invoke(eq(node.name()), 
any(MetastorageIndexTermRequestMessage.class), anyLong()))
+                .thenReturn(completedFuture(indexTermResponse(raftIndex, 
raftTerm)));
+    }
+
+    private MetastorageIndexTermResponseMessage indexTermResponse(int 
raftIndex, int raftTerm) {
+        return 
messagesFactory.metastorageIndexTermResponseMessage().raftIndex(raftIndex).raftTerm(raftTerm).build();
+    }
+
+    @Test
+    void repairsWithMgFactor3() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1, 
node2, node3, node4)));
+
+        willRespondWithIndexAndTerm(node1, 12, 1);
+        willRespondWithIndexAndTerm(node2, 11, 1);
+        willRespondWithIndexAndTerm(node3, 10, 1);
+
+        Set<String> threeNodes = Set.of(node1.name(), node2.name(), 
node3.name());
+        assertThat(repair.repair(threeNodes, 3), willSucceedIn(3, SECONDS));
+
+        verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+        assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node1.name(), 
node2.name(), node3.name()));
+
+        verify(messagingService).invoke(eq(node1.name()), 
becomeLeaderMessageCaptor.capture(), anyLong());
+        assertThat(becomeLeaderMessageCaptor.getValue().termBeforeChange(), 
is(1L));
+        assertThat(becomeLeaderMessageCaptor.getValue().targetVotingSet(), 
is(threeNodes));
+    }
+
+    @Test
+    void proceedsIfParticipatingNodesAppearLaterThanRepairStarts() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
+        doAnswer(invocation -> {
+            LogicalTopologyEventListener listener = invocation.getArgument(0);
+
+            listener.onNodeValidated(new LogicalNode(node1));
+            listener.onNodeValidated(new LogicalNode(node2));
+
+            return null;
+        }).when(logicalTopology).addEventListener(any());
+
+        willRespondWithIndexAndTerm(node1, 10, 1);
+
+        assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3, 
SECONDS));
+    }
+
+    @Test
+    void waitsTillEveryNodeResponds() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1, 
node2)));
+
+        willRespondWithIndexAndTerm(node1, 10, 1);
+        when(messagingService.invoke(eq(node2.name()), any(), 
anyLong())).thenReturn(new CompletableFuture<>());
+
+        assertThat(repair.repair(Set.of(node1.name(), node2.name()), 1), 
willTimeoutIn(100, MILLISECONDS));
+    }
+
+    @Test
+    void choosesBestMgNodesAndLeader() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1, 
node2, node3)));
+
+        willRespondWithIndexAndTerm(node1, 10, 1);
+        willRespondWithIndexAndTerm(node2, 12, 1);
+        willRespondWithIndexAndTerm(node3, 11, 1);
+
+        assertThat(repair.repair(Set.of(node1.name(), node2.name(), 
node3.name()), 2), willSucceedIn(3, SECONDS));
+
+        verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+        assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node2.name(), 
node3.name()));
+
+        verify(messagingService).invoke(eq(node2.name()), 
becomeLeaderMessageCaptor.capture(), anyLong());
+    }
+
+    @Test
+    void termIsStrongerThanIndexWhenComparing() {
+        
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1, 
node2, node3)));
+
+        willRespondWithIndexAndTerm(node1, 10, 3);
+        willRespondWithIndexAndTerm(node2, 12, 1);
+        willRespondWithIndexAndTerm(node3, 11, 2);
+
+        assertThat(repair.repair(Set.of(node1.name(), node2.name(), 
node3.name()), 2), willSucceedIn(3, SECONDS));
+
+        verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+        assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node1.name(), 
node3.name()));
+
+        verify(messagingService).invoke(eq(node1.name()), 
becomeLeaderMessageCaptor.capture(), anyLong());
+    }
+}
diff --git 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index bddfd1e467..f780a2c48e 100644
--- 
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++ 
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -21,6 +21,7 @@ import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.cluster.management.ClusterTag.randomClusterTag;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
@@ -56,6 +57,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import 
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import 
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
@@ -228,7 +230,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     @EnumSource(ResetCluster.class)
     void resetClusterRequiresClusterState(ResetCluster action) {
         when(topologyService.allMembers()).thenReturn(List.of(thisNode));
-        markinitConfigApplied();
+        markInitConfigApplied();
 
         ClusterResetException ex = 
assertWillThrow(action.resetCluster(manager, List.of(thisNodeName)), 
ClusterResetException.class);
         assertThat(ex.getMessage(), is("Node does not have cluster state."));
@@ -248,7 +250,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
     }
 
-    private void markinitConfigApplied() {
+    private void markInitConfigApplied() {
         vaultManager.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
     }
 
@@ -260,7 +262,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
         prepareNodeStateForClusterReset();
 
-        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ResetClusterMessage.class), anyLong()))
                 .thenReturn(completedFuture(successResponseMessage));
 
         CompletableFuture<Void> future = action.resetCluster(manager, 
List.of(thisNodeName, node2.name()));
@@ -283,7 +285,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void prepareNodeStateForClusterReset() {
-        markinitConfigApplied();
+        markInitConfigApplied();
         putClusterState();
     }
 
@@ -312,7 +314,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
         prepareNodeStateForClusterReset();
 
-        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ResetClusterMessage.class), anyLong()))
                 .thenReturn(completedFuture(successResponseMessage));
 
         CompletableFuture<Void> future = action.resetCluster(manager, 
List.of(thisNodeName, node2.name(), node3.name()));
@@ -355,7 +357,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private void respondSuccessfullyFrom(ClusterNode node) {
-        when(messagingService.invoke(eq(node), any(), anyLong()))
+        when(messagingService.invoke(eq(node), any(ResetClusterMessage.class), 
anyLong()))
                 .thenReturn(completedFuture(successResponseMessage));
     }
 
@@ -548,7 +550,7 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
         ArgumentCaptor<ResetClusterMessage> messageCaptor = 
ArgumentCaptor.forClass(ResetClusterMessage.class);
 
         when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2, 
node3));
-        when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+        when(messagingService.invoke(any(ClusterNode.class), 
any(ResetClusterMessage.class), anyLong()))
                 .thenReturn(completedFuture(successResponseMessage));
 
         assertThat(manager.migrate(newState), willCompleteSuccessfully());
@@ -641,13 +643,18 @@ class SystemDisasterRecoveryManagerImplTest extends 
BaseIgniteAbstractTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void initiatesBecomingMetastorageLeader(boolean 
pauseLeaderSecondaryDuties) {
-        
when(metastorageMaintenance.becomeLonelyLeader(pauseLeaderSecondaryDuties)).thenReturn(nullCompletedFuture());
+    @ValueSource(ints = {1, 3})
+    void initiatesBecomingMetastorageLeader(int targetVotingSetSize) {
+        Set<String> targetSet = IntStream.range(0, targetVotingSetSize)
+                .mapToObj(n -> "node" + n)
+                .collect(toSet());
+        when(metastorageMaintenance.becomeLonelyLeader(1, targetSet))
+                .thenReturn(nullCompletedFuture());
 
         NetworkMessageHandler handler = extractMessageHandler();
         BecomeMetastorageLeaderMessage message = 
messagesFactory.becomeMetastorageLeaderMessage()
-                .pauseLeaderSecondaryDuties(pauseLeaderSecondaryDuties)
+                .termBeforeChange(1)
+                .targetVotingSet(targetSet)
                 .build();
         handler.onReceived(message, thisNode, 123L);
 

Reply via email to