This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0fc37b6e48 IGNITE-23215 Implement MetastorageRepair (#4403)
0fc37b6e48 is described below
commit 0fc37b6e48d7a2bf0ed6c8d02cce744e407ec2ef
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Sep 18 14:44:15 2024 +0400
IGNITE-23215 Implement MetastorageRepair (#4403)
---
.../management/ClusterManagementGroupManager.java | 18 ++
.../cluster/management/raft/CmgRaftService.java | 13 ++
.../impl/ItMetaStorageMaintenanceTest.java | 15 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 7 +-
.../impl/MetastorageGroupMaintenance.java | 8 +-
.../apache/ignite/internal/raft/IndexWithTerm.java | 41 +++-
.../internal/raft/service/LeaderWithTerm.java | 6 +
.../ignite/internal/raft/IndexWithTermTest.java} | 32 ++--
.../apache/ignite/raft/jraft/core/NodeImpl.java | 8 +-
modules/system-disaster-recovery-api/build.gradle | 1 +
.../message/BecomeMetastorageLeaderMessage.java | 8 +-
.../MetastorageRepair.java} | 21 ++-
.../disaster/system/MetastorageRepairImpl.java | 192 +++++++++++++++++++
.../system/SystemDisasterRecoveryManagerImpl.java | 25 ++-
.../disaster/system/MetastorageRepairImplTest.java | 209 +++++++++++++++++++++
.../SystemDisasterRecoveryManagerImplTest.java | 29 +--
16 files changed, 572 insertions(+), 61 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index a456fe8bf3..08ca0d36c9 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -1057,6 +1057,24 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
}
}
+ /**
+ * Changes metastorage nodes in the CMG.
+ *
+ * @return Future that completes when the command is executed by the CMG.
+ */
+ public CompletableFuture<Void> changeMetastorageNodes(Set<String>
newMetastorageNodes) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return raftServiceAfterJoin()
+ .thenCompose(service ->
service.changeMetastorageNodes(newMetastorageNodes));
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
/**
* Returns a future resolving to the initial cluster configuration in
HOCON format. The resulting configuration may be {@code null} if
* not provided by the user.
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index b89322943a..b41948903c 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.NodeAttributes;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import
org.apache.ignite.internal.cluster.management.raft.commands.ChangeMetastorageNodesCommand;
import
org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
import
org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
import
org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
@@ -324,6 +325,18 @@ public class CmgRaftService implements ManuallyCloseable {
}
}
+ /**
+ * Changes Metastorage nodes.
+ *
+ * @return Future that completes when the change is finished.
+ */
+ public CompletableFuture<Void> changeMetastorageNodes(Set<String>
newMetastorageNodes) {
+ ChangeMetastorageNodesCommand command =
msgFactory.changeMetastorageNodesCommand()
+ .metaStorageNodes(Set.copyOf(newMetastorageNodes))
+ .build();
+ return raftService.run(command);
+ }
+
@Override
public void close() {
raftService.shutdown();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index f7acce025e..6705f543e1 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
@@ -58,11 +59,15 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// Metastorage does not work anymore.
assertThatMetastorageHasNoMajority(node0);
- assertThat(node0.metaStorageManager.becomeLonelyLeader(true),
willCompleteSuccessfully());
+ assertThat(node0.metaStorageManager.becomeLonelyLeader(1,
allNodeNames()), willCompleteSuccessfully());
assertThatMetastorageHasMajority(node0);
}
+ private Set<String> allNodeNames() {
+ return nodes.stream().map(Node::name).collect(toSet());
+ }
+
/**
* Starts 3 voting Raft nodes. 'Voting' here is opposed to a 'learner'
node which does not vote.
*
@@ -110,7 +115,7 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// Stop the majority.
stopAllNodesExcept0();
- assertThat(node0.metaStorageManager.becomeLonelyLeader(true),
willCompleteSuccessfully());
+ assertThat(node0.metaStorageManager.becomeLonelyLeader(1,
allNodeNames()), willCompleteSuccessfully());
Node node3 = startNode();
@@ -130,7 +135,7 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// Stop the majority.
stopAllNodesExcept0();
- assertThat(node0.metaStorageManager.becomeLonelyLeader(false),
willCompleteSuccessfully());
+ assertThat(node0.metaStorageManager.becomeLonelyLeader(1,
Set.of(node0.name())), willCompleteSuccessfully());
Node node3 = startNode();
@@ -159,7 +164,7 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// Stop the majority.
stopAllNodesExcept0();
- assertThat(node0.metaStorageManager.becomeLonelyLeader(true),
willCompleteSuccessfully());
+ assertThat(node0.metaStorageManager.becomeLonelyLeader(1,
allNodeNames()), willCompleteSuccessfully());
ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
@@ -198,7 +203,7 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// Stop the majority.
stopAllNodesExcept0();
- assertThat(node0.metaStorageManager.becomeLonelyLeader(false),
willCompleteSuccessfully());
+ assertThat(node0.metaStorageManager.becomeLonelyLeader(1,
Set.of(node0.name())), willCompleteSuccessfully());
ClusterTime clusterTime0 = node0.metaStorageManager.clusterTime();
HybridTimestamp timeBeforeOp = clusterTime0.currentSafeTime();
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 7660bdf3d6..7688a2fe11 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -157,7 +157,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
*/
private volatile boolean leaderSecondaryDutiesPaused = false;
- /** Protects {@link #becomeLonelyLeader(boolean)} from concurrent
executions. */
+ /** Protects {@link #becomeLonelyLeader(long, Set)} from concurrent
executions. */
private final Object becomeLonelyLeaderMutex = new Object();
/**
@@ -888,10 +888,11 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
@Override
- public CompletableFuture<Void> becomeLonelyLeader(boolean
pauseLeaderSecondaryDuties) {
+ public CompletableFuture<Void> becomeLonelyLeader(long termBeforeChange,
Set<String> targetVotingSet) {
+ // TODO: IGNITE-22899 - use both parameters.
return inBusyLockAsync(busyLock, () -> {
synchronized (becomeLonelyLeaderMutex) {
- leaderSecondaryDutiesPaused = pauseLeaderSecondaryDuties;
+ leaderSecondaryDutiesPaused = targetVotingSet.size() > 1;
RaftNodeId raftNodeId = raftNodeId();
PeersAndLearners newConfiguration =
PeersAndLearners.fromPeers(Set.of(raftNodeId.peer()), emptySet());
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
index d1acc34e4d..a600b7e4d6 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetastorageGroupMaintenance.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.metastorage.impl;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.raft.IndexWithTerm;
@@ -35,8 +36,9 @@ public interface MetastorageGroupMaintenance {
/**
* Makes this node a leader of the Metastorage group (with the voting set
containing of just this node).
*
- * @param pauseLeaderSecondaryDuties Whether leader secondary duties
(managing learners, propagating idle safe time) should be paused.
- * @return Future which completes when this node becomes a leader.
+ * @param termBeforeChange Term that Metastorage on the target node has
before we make it become the leader.
+ * @param targetVotingSet Voting set members which we want to achieve
(becoming a leader is just the first step in doing so).
+ * @return Future which completes when new leader becomes available.
*/
- CompletableFuture<Void> becomeLonelyLeader(boolean
pauseLeaderSecondaryDuties);
+ CompletableFuture<Void> becomeLonelyLeader(long termBeforeChange,
Set<String> targetVotingSet);
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
index b95be4417b..3c40d50546 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.raft;
/**
* Raft index with the corresponding term.
*/
-public class IndexWithTerm {
+public class IndexWithTerm implements Comparable<IndexWithTerm> {
private final long index;
private final long term;
@@ -39,4 +39,43 @@ public class IndexWithTerm {
public long term() {
return term;
}
+
+ @Override
+ public int compareTo(IndexWithTerm that) {
+ int byTerm = Long.compare(this.term, that.term);
+ if (byTerm != 0) {
+ return byTerm;
+ }
+
+ return Long.compare(this.index, that.index);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IndexWithTerm that = (IndexWithTerm) o;
+
+ if (index != that.index) {
+ return false;
+ }
+ return term == that.term;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (index ^ (index >>> 32));
+ result = 31 * result + (int) (term ^ (term >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return index + ":" + term;
+ }
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
index d22ae5e091..450784cf13 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/LeaderWithTerm.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.raft.service;
import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
/**
@@ -53,4 +54,9 @@ public class LeaderWithTerm {
public long term() {
return term;
}
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
similarity index 52%
copy from
modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
copy to
modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
index b95be4417b..0e35f4a244 100644
---
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/IndexWithTerm.java
+++
b/modules/raft-api/src/test/java/org/apache/ignite/internal/raft/IndexWithTermTest.java
@@ -17,26 +17,22 @@
package org.apache.ignite.internal.raft;
-/**
- * Raft index with the corresponding term.
- */
-public class IndexWithTerm {
- private final long index;
- private final long term;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
- /** Constructor. */
- public IndexWithTerm(long index, long term) {
- this.index = index;
- this.term = term;
- }
+import org.junit.jupiter.api.Test;
- /** Returns the index. */
- public long index() {
- return index;
- }
+class IndexWithTermTest {
+ @Test
+ void compareToComparesByTermThenByIndex() {
+ assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(3, 1)),
is(greaterThan(0)));
+
+ assertThat(new IndexWithTerm(3, 1).compareTo(new IndexWithTerm(3, 2)),
is(lessThan(0)));
+ assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(3, 2)),
is(lessThan(0)));
+ assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(5, 2)),
is(lessThan(0)));
- /** Returns the term corresponding to the index. */
- public long term() {
- return term;
+ assertThat(new IndexWithTerm(5, 1).compareTo(new IndexWithTerm(5, 1)),
is(0));
}
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 3bfbae526b..4b7fb58408 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -2778,13 +2778,7 @@ public class NodeImpl implements Node, RaftServerService
{
@Override
public long lastLogIndex() {
- this.readLock.lock();
- try {
- return logManager.getLastLogIndex();
- }
- finally {
- this.readLock.unlock();
- }
+ return lastLogIndexAndTerm().getIndex();
}
@Override
diff --git a/modules/system-disaster-recovery-api/build.gradle
b/modules/system-disaster-recovery-api/build.gradle
index 4c93ec2153..ac1901adfc 100644
--- a/modules/system-disaster-recovery-api/build.gradle
+++ b/modules/system-disaster-recovery-api/build.gradle
@@ -26,6 +26,7 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-network-api')
implementation project(':ignite-vault')
+ implementation project(':ignite-raft-api')
implementation libs.jetbrains.annotations
testImplementation libs.hamcrest.core
diff --git
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
index 3d96e5670f..c89db79c5e 100644
---
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
+++
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.disaster.system.message;
+import java.util.Set;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
@@ -25,6 +26,9 @@ import
org.apache.ignite.internal.network.annotations.Transferable;
*/
@Transferable(SystemDisasterRecoveryMessageGroup.BECOME_METASTORAGE_LEADER)
public interface BecomeMetastorageLeaderMessage extends NetworkMessage {
- /** Whether leader secondary duties (managing learners, propagating idle
safe time) should be paused. */
- boolean pauseLeaderSecondaryDuties();
+ /** Term that Metastorage on the target node has before we make it become
the leader. */
+ long termBeforeChange();
+
+ /** Voting set members (aka peers) which we want to achieve (becoming a
leader is just the first step in doing so). */
+ Set<String> targetVotingSet();
}
diff --git
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
similarity index 56%
copy from
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
copy to
modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
index 3d96e5670f..8abab1b601 100644
---
a/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/message/BecomeMetastorageLeaderMessage.java
+++
b/modules/system-disaster-recovery-api/src/main/java/org/apache/ignite/internal/disaster/system/repair/MetastorageRepair.java
@@ -15,16 +15,21 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.disaster.system.message;
+package org.apache.ignite.internal.disaster.system.repair;
-import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.network.annotations.Transferable;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
- * A command to make a node become a Metastorage Raft group leader.
+ * Metastorage repair logic.
*/
-@Transferable(SystemDisasterRecoveryMessageGroup.BECOME_METASTORAGE_LEADER)
-public interface BecomeMetastorageLeaderMessage extends NetworkMessage {
- /** Whether leader secondary duties (managing learners, propagating idle
safe time) should be paused. */
- boolean pauseLeaderSecondaryDuties();
+public interface MetastorageRepair {
+ /**
+ * Performs Metastorage repair.
+ *
+ * @param participatingNodeNames Names of the nodes that participate in
the repair.
+ * @param metastorageReplicationFactor Replication factor for Metastorage
requested by the user.
+ * @return Future that gets completed when repair is initiated.
+ */
+ CompletableFuture<Void> repair(Set<String> participatingNodeNames, int
metastorageReplicationFactor);
}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
new file mode 100644
index 0000000000..9d4b13e48d
--- /dev/null
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.CollectionUtils.difference;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.disaster.system.repair.MetastorageRepair;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.raft.IndexWithTerm;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implementation of {@link MetastorageRepair}.
+ */
+public class MetastorageRepairImpl implements MetastorageRepair {
+ private static final IgniteLogger LOG =
Loggers.forClass(MetastorageRepairImpl.class);
+
+ /** Number of seconds to wait for nodes participating in a repair to
appear. */
+ private static final long WAIT_FOR_NODES_SECONDS = 60;
+
+ private final MessagingService messagingService;
+ private final LogicalTopology logicalTopology;
+ private final ClusterManagementGroupManager cmgManager;
+
+ private final SystemDisasterRecoveryMessagesFactory messagesFactory = new
SystemDisasterRecoveryMessagesFactory();
+
+ /** Constructor. */
+ public MetastorageRepairImpl(
+ MessagingService messagingService,
+ LogicalTopology logicalTopology,
+ ClusterManagementGroupManager cmgManager
+ ) {
+ this.messagingService = messagingService;
+ this.logicalTopology = logicalTopology;
+ this.cmgManager = cmgManager;
+ }
+
+ @Override
+ public CompletableFuture<Void> repair(Set<String> participatingNodeNames,
int metastorageReplicationFactor) {
+ LOG.info("Starting MG repair [participatingNodes={},
replicationFactor={}].", participatingNodeNames, metastorageReplicationFactor);
+
+ return waitTillValidatedNodesContain(participatingNodeNames)
+ .thenCompose(unused ->
collectMetastorageIndexes(participatingNodeNames))
+ .thenCompose(indexes -> {
+ LOG.info("Collected metastorage indexes [indexes={}].",
indexes);
+
+ Set<String> newMgNodes = nodesWithBestIndexes(indexes,
metastorageReplicationFactor);
+ LOG.info("Chose new MG nodes [mgNodes={}].", newMgNodes);
+
+ String bestNodeName = chooseNodeWithBestIndex(indexes,
newMgNodes);
+ LOG.info("Chose best MG node [node={}].", bestNodeName);
+
+ return cmgManager.changeMetastorageNodes(newMgNodes)
+ .thenCompose(unused -> appointLeader(bestNodeName,
indexes.get(bestNodeName).term(), newMgNodes))
+ .thenRun(() -> LOG.info("Appointed MG leader
forcefully [leader={}].", bestNodeName));
+ });
+ }
+
+ private CompletableFuture<Void> waitTillValidatedNodesContain(Set<String>
nodeNames) {
+ Set<String> cumulativeValidatedNodeNames =
ConcurrentHashMap.newKeySet();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ LogicalTopologyEventListener listener = new
LogicalTopologyEventListener() {
+ @Override
+ public void onNodeValidated(LogicalNode validatedNode) {
+ cumulativeValidatedNodeNames.add(validatedNode.name());
+
+ if (isSuperset(cumulativeValidatedNodeNames, nodeNames)) {
+ future.complete(null);
+ }
+ }
+
+ @Override
+ public void onNodeInvalidated(LogicalNode invalidatedNode) {
+ cumulativeValidatedNodeNames.remove(invalidatedNode.name());
+ }
+ };
+
+ logicalTopology.addEventListener(listener);
+
+ cmgManager.validatedNodes()
+ .thenAccept(validatedNodes -> {
+ Set<String> validatedNodeNames = validatedNodes.stream()
+ .map(ClusterNode::name)
+ .collect(toSet());
+ if (isSuperset(validatedNodeNames, nodeNames)) {
+ future.complete(null);
+ }
+
+ cumulativeValidatedNodeNames.addAll(validatedNodeNames);
+ });
+
+ return future
+ .orTimeout(WAIT_FOR_NODES_SECONDS, SECONDS)
+ .whenComplete((res, ex) -> {
+ logicalTopology.removeEventListener(listener);
+
+ if (ex instanceof TimeoutException) {
+ LOG.error("Did not see all participating nodes online
in time, failing Metastorage repair, please try again", ex);
+ }
+ });
+ }
+
+ private static boolean isSuperset(Set<String> container, Set<String>
containee) {
+ return difference(containee, container).isEmpty();
+ }
+
+ private CompletableFuture<Map<String, IndexWithTerm>>
collectMetastorageIndexes(Set<String> participatingNodeNames) {
+ MetastorageIndexTermRequestMessage request =
messagesFactory.metastorageIndexTermRequestMessage().build();
+
+ Map<String, CompletableFuture<MetastorageIndexTermResponseMessage>>
responses = new HashMap<>();
+
+ for (String nodeName : participatingNodeNames) {
+ responses.put(
+ nodeName,
+ messagingService.invoke(nodeName, request,
10_000).thenApply(MetastorageIndexTermResponseMessage.class::cast)
+ );
+ }
+
+ return allOf(responses.values()).thenApply(unused -> {
+ return responses.entrySet().stream()
+ .collect(toMap(Entry::getKey, entry ->
indexWithTerm(entry.getValue().join())));
+ });
+ }
+
+ private static IndexWithTerm
indexWithTerm(MetastorageIndexTermResponseMessage message) {
+ return new IndexWithTerm(message.raftIndex(), message.raftTerm());
+ }
+
+ private static Set<String> nodesWithBestIndexes(Map<String, IndexWithTerm>
indexes, int metastorageReplicationFactor) {
+ return indexes.entrySet().stream()
+ .sorted(Entry.<String,
IndexWithTerm>comparingByValue().reversed())
+ .limit(metastorageReplicationFactor)
+ .map(Entry::getKey)
+ .collect(toSet());
+ }
+
+ private static String chooseNodeWithBestIndex(Map<String, IndexWithTerm>
indexes, Set<String> newMgNodes) {
+ return newMgNodes.stream()
+ .max(Comparator.comparing(indexes::get))
+ .orElseThrow();
+ }
+
+ private CompletableFuture<Void> appointLeader(String bestNodeName, long
termBeforeChange, Set<String> newMgNodes) {
+ LOG.info("Appointing MG leader forcefully [leader={}].", bestNodeName);
+
+ BecomeMetastorageLeaderMessage request =
messagesFactory.becomeMetastorageLeaderMessage()
+ .termBeforeChange(termBeforeChange)
+ .targetVotingSet(Set.copyOf(newMgNodes))
+ .build();
+
+ return messagingService.invoke(bestNodeName, request, 10_000)
+ .thenApply(message -> null);
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
index 1c51615984..ce190d3090 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImpl.java
@@ -46,6 +46,8 @@ import
org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
import
org.apache.ignite.internal.disaster.system.message.ResetClusterMessageBuilder;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessageGroup;
import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.impl.MetastorageGroupMaintenance;
@@ -61,6 +63,8 @@ import org.jetbrains.annotations.Nullable;
* Implementation of {@link SystemDisasterRecoveryManager}.
*/
public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecoveryManager, IgniteComponent {
+ private static final IgniteLogger LOG =
Loggers.forClass(SystemDisasterRecoveryManagerImpl.class);
+
private final String thisNodeName;
private final TopologyService topologyService;
private final MessagingService messagingService;
@@ -121,7 +125,12 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
if (!thisNodeName.equals(sender.name())) {
restarter.initiateRestart();
}
- }, restartExecutor);
+ }, restartExecutor)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Error when handling a
ResetClusterMessage", ex);
+ }
+ });
});
}
@@ -138,12 +147,22 @@ public class SystemDisasterRecoveryManagerImpl implements
SystemDisasterRecovery
.build();
messagingService.respond(sender, response, correlationId);
+ })
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Error when handling a
MetastorageIndexTermRequestMessage", ex);
+ }
});
}
private void
handleBecomeMetastorageLeaderMessage(BecomeMetastorageLeaderMessage message,
ClusterNode sender, long correlationId) {
-
metastorageGroupMaintenance.becomeLonelyLeader(message.pauseLeaderSecondaryDuties())
- .thenRun(() -> messagingService.respond(sender,
successResponseMessage(), correlationId));
+
metastorageGroupMaintenance.becomeLonelyLeader(message.termBeforeChange(),
message.targetVotingSet())
+ .thenRun(() -> messagingService.respond(sender,
successResponseMessage(), correlationId))
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ LOG.error("Error when handling a
BecomeMetastorageLeaderMessage", ex);
+ }
+ });
}
@Override
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
new file mode 100644
index 0000000000..28e4cad5b9
--- /dev/null
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.disaster.system;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
+import
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermRequestMessage;
+import
org.apache.ignite.internal.disaster.system.message.MetastorageIndexTermResponseMessage;
+import
org.apache.ignite.internal.disaster.system.message.SystemDisasterRecoveryMessagesFactory;
+import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class MetastorageRepairImplTest extends BaseIgniteAbstractTest {
+ @Mock
+ private MessagingService messagingService;
+
+ @Mock
+ private LogicalTopology logicalTopology;
+
+ @Mock
+ private ClusterManagementGroupManager cmgManager;
+
+ @InjectMocks
+ private MetastorageRepairImpl repair;
+
+ @Captor
+ private ArgumentCaptor<Set<String>> mgNodesCaptor;
+
+ @Captor
+ private ArgumentCaptor<BecomeMetastorageLeaderMessage>
becomeLeaderMessageCaptor;
+
+ private final SystemDisasterRecoveryMessagesFactory messagesFactory = new
SystemDisasterRecoveryMessagesFactory();
+ private final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
+
+ private final ClusterNode node1 = new
ClusterNodeImpl(randomUUID().toString(), "node1", new NetworkAddress("host",
1001));
+ private final ClusterNode node2 = new
ClusterNodeImpl(randomUUID().toString(), "node2", new NetworkAddress("host",
1002));
+ private final ClusterNode node3 = new
ClusterNodeImpl(randomUUID().toString(), "node3", new NetworkAddress("host",
1003));
+ private final ClusterNode node4 = new
ClusterNodeImpl(randomUUID().toString(), "node4", new NetworkAddress("host",
1004));
+
+ @BeforeEach
+ void configureMocks() {
+ lenient().when(messagingService.invoke(any(String.class),
any(BecomeMetastorageLeaderMessage.class), anyLong()))
+
.thenReturn(completedFuture(cmgMessagesFactory.successResponseMessage().build()));
+ lenient().when(cmgManager.changeMetastorageNodes(any()))
+ .thenReturn(nullCompletedFuture());
+ }
+
+ @Test
+ void hangsIfParticipatingNodesNeverAppear() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
+
+ assertThat(repair.repair(Set.of("node1", "node2"), 1),
willTimeoutIn(100, MILLISECONDS));
+ }
+
+ @Test
+ void repairsWithMgFactor1() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1)));
+
+ willRespondWithIndexAndTerm(node1, 10, 1);
+
+ assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3,
SECONDS));
+
+ verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+ assertThat(mgNodesCaptor.getValue(), contains(node1.name()));
+
+ verify(messagingService).invoke(eq(node1.name()),
becomeLeaderMessageCaptor.capture(), anyLong());
+ assertThat(becomeLeaderMessageCaptor.getValue().termBeforeChange(),
is(1L));
+ assertThat(becomeLeaderMessageCaptor.getValue().targetVotingSet(),
is(Set.of(node1.name())));
+ }
+
+ private void willRespondWithIndexAndTerm(ClusterNode node, int raftIndex,
int raftTerm) {
+ when(messagingService.invoke(eq(node.name()),
any(MetastorageIndexTermRequestMessage.class), anyLong()))
+ .thenReturn(completedFuture(indexTermResponse(raftIndex,
raftTerm)));
+ }
+
+ private MetastorageIndexTermResponseMessage indexTermResponse(int
raftIndex, int raftTerm) {
+ return
messagesFactory.metastorageIndexTermResponseMessage().raftIndex(raftIndex).raftTerm(raftTerm).build();
+ }
+
+ @Test
+ void repairsWithMgFactor3() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1,
node2, node3, node4)));
+
+ willRespondWithIndexAndTerm(node1, 12, 1);
+ willRespondWithIndexAndTerm(node2, 11, 1);
+ willRespondWithIndexAndTerm(node3, 10, 1);
+
+ Set<String> threeNodes = Set.of(node1.name(), node2.name(),
node3.name());
+ assertThat(repair.repair(threeNodes, 3), willSucceedIn(3, SECONDS));
+
+ verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+ assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node1.name(),
node2.name(), node3.name()));
+
+ verify(messagingService).invoke(eq(node1.name()),
becomeLeaderMessageCaptor.capture(), anyLong());
+ assertThat(becomeLeaderMessageCaptor.getValue().termBeforeChange(),
is(1L));
+ assertThat(becomeLeaderMessageCaptor.getValue().targetVotingSet(),
is(threeNodes));
+ }
+
+ @Test
+ void proceedsIfParticipatingNodesAppearLaterThanRepairStarts() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
+ doAnswer(invocation -> {
+ LogicalTopologyEventListener listener = invocation.getArgument(0);
+
+ listener.onNodeValidated(new LogicalNode(node1));
+ listener.onNodeValidated(new LogicalNode(node2));
+
+ return null;
+ }).when(logicalTopology).addEventListener(any());
+
+ willRespondWithIndexAndTerm(node1, 10, 1);
+
+ assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3,
SECONDS));
+ }
+
+ @Test
+ void waitsTillEveryNodeResponds() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1,
node2)));
+
+ willRespondWithIndexAndTerm(node1, 10, 1);
+ when(messagingService.invoke(eq(node2.name()), any(),
anyLong())).thenReturn(new CompletableFuture<>());
+
+ assertThat(repair.repair(Set.of(node1.name(), node2.name()), 1),
willTimeoutIn(100, MILLISECONDS));
+ }
+
+ @Test
+ void choosesBestMgNodesAndLeader() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1,
node2, node3)));
+
+ willRespondWithIndexAndTerm(node1, 10, 1);
+ willRespondWithIndexAndTerm(node2, 12, 1);
+ willRespondWithIndexAndTerm(node3, 11, 1);
+
+ assertThat(repair.repair(Set.of(node1.name(), node2.name(),
node3.name()), 2), willSucceedIn(3, SECONDS));
+
+ verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+ assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node2.name(),
node3.name()));
+
+ verify(messagingService).invoke(eq(node2.name()),
becomeLeaderMessageCaptor.capture(), anyLong());
+ }
+
+ @Test
+ void termIsStrongerThanIndexWhenComparing() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1,
node2, node3)));
+
+ willRespondWithIndexAndTerm(node1, 10, 3);
+ willRespondWithIndexAndTerm(node2, 12, 1);
+ willRespondWithIndexAndTerm(node3, 11, 2);
+
+ assertThat(repair.repair(Set.of(node1.name(), node2.name(),
node3.name()), 2), willSucceedIn(3, SECONDS));
+
+ verify(cmgManager).changeMetastorageNodes(mgNodesCaptor.capture());
+ assertThat(mgNodesCaptor.getValue(), containsInAnyOrder(node1.name(),
node3.name()));
+
+ verify(messagingService).invoke(eq(node1.name()),
becomeLeaderMessageCaptor.capture(), anyLong());
+ }
+}
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
index bddfd1e467..f780a2c48e 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryManagerImplTest.java
@@ -21,6 +21,7 @@ import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.cluster.management.ClusterTag.randomClusterTag;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrow;
@@ -56,6 +57,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.network.messages.SuccessResponseMessage;
@@ -228,7 +230,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
@EnumSource(ResetCluster.class)
void resetClusterRequiresClusterState(ResetCluster action) {
when(topologyService.allMembers()).thenReturn(List.of(thisNode));
- markinitConfigApplied();
+ markInitConfigApplied();
ClusterResetException ex =
assertWillThrow(action.resetCluster(manager, List.of(thisNodeName)),
ClusterResetException.class);
assertThat(ex.getMessage(), is("Node does not have cluster state."));
@@ -248,7 +250,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
vaultManager.put(CLUSTER_STATE_VAULT_KEY, toBytes(usualClusterState));
}
- private void markinitConfigApplied() {
+ private void markInitConfigApplied() {
vaultManager.put(INIT_CONFIG_APPLIED_VAULT_KEY, BYTE_EMPTY_ARRAY);
}
@@ -260,7 +262,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
prepareNodeStateForClusterReset();
- when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ when(messagingService.invoke(any(ClusterNode.class),
any(ResetClusterMessage.class), anyLong()))
.thenReturn(completedFuture(successResponseMessage));
CompletableFuture<Void> future = action.resetCluster(manager,
List.of(thisNodeName, node2.name()));
@@ -283,7 +285,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void prepareNodeStateForClusterReset() {
- markinitConfigApplied();
+ markInitConfigApplied();
putClusterState();
}
@@ -312,7 +314,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
prepareNodeStateForClusterReset();
- when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ when(messagingService.invoke(any(ClusterNode.class),
any(ResetClusterMessage.class), anyLong()))
.thenReturn(completedFuture(successResponseMessage));
CompletableFuture<Void> future = action.resetCluster(manager,
List.of(thisNodeName, node2.name(), node3.name()));
@@ -355,7 +357,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
private void respondSuccessfullyFrom(ClusterNode node) {
- when(messagingService.invoke(eq(node), any(), anyLong()))
+ when(messagingService.invoke(eq(node), any(ResetClusterMessage.class),
anyLong()))
.thenReturn(completedFuture(successResponseMessage));
}
@@ -548,7 +550,7 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
ArgumentCaptor<ResetClusterMessage> messageCaptor =
ArgumentCaptor.forClass(ResetClusterMessage.class);
when(topologyService.allMembers()).thenReturn(List.of(thisNode, node2,
node3));
- when(messagingService.invoke(any(ClusterNode.class), any(), anyLong()))
+ when(messagingService.invoke(any(ClusterNode.class),
any(ResetClusterMessage.class), anyLong()))
.thenReturn(completedFuture(successResponseMessage));
assertThat(manager.migrate(newState), willCompleteSuccessfully());
@@ -641,13 +643,18 @@ class SystemDisasterRecoveryManagerImplTest extends
BaseIgniteAbstractTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- void initiatesBecomingMetastorageLeader(boolean
pauseLeaderSecondaryDuties) {
-
when(metastorageMaintenance.becomeLonelyLeader(pauseLeaderSecondaryDuties)).thenReturn(nullCompletedFuture());
+ @ValueSource(ints = {1, 3})
+ void initiatesBecomingMetastorageLeader(int targetVotingSetSize) {
+ Set<String> targetSet = IntStream.range(0, targetVotingSetSize)
+ .mapToObj(n -> "node" + n)
+ .collect(toSet());
+ when(metastorageMaintenance.becomeLonelyLeader(1, targetSet))
+ .thenReturn(nullCompletedFuture());
NetworkMessageHandler handler = extractMessageHandler();
BecomeMetastorageLeaderMessage message =
messagesFactory.becomeMetastorageLeaderMessage()
- .pauseLeaderSecondaryDuties(pauseLeaderSecondaryDuties)
+ .termBeforeChange(1)
+ .targetVotingSet(targetSet)
.build();
handler.onReceived(message, thisNode, 123L);