This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d3b0cc2ca6 IGNITE-23462 Also listen for node join when waiting for
nodes doing MG repair (#4574)
d3b0cc2ca6 is described below
commit d3b0cc2ca6982ff3547820a32025bc4232b9d477
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Oct 16 17:50:33 2024 +0400
IGNITE-23462 Also listen for node join when waiting for nodes doing MG
repair (#4574)
---
.../disaster/system/MetastorageRepairImpl.java | 21 ++++++++++++++++++++
.../disaster/system/MetastorageRepairImplTest.java | 23 +++++++++++++++++++++-
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
index 0e61c6e48c..9efc85cd0b 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImpl.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
import
org.apache.ignite.internal.disaster.system.message.StartMetastorageRepairRequest;
import
org.apache.ignite.internal.disaster.system.message.StartMetastorageRepairResponse;
@@ -101,6 +102,19 @@ public class MetastorageRepairImpl implements
MetastorageRepair {
LogicalTopologyEventListener listener = new
LogicalTopologyEventListener() {
@Override
public void onNodeValidated(LogicalNode validatedNode) {
+ LOG.info("Node (awaited by Metastorage repair) has been
validated in CMG: {}", validatedNode.name());
+
+ markNodeAsAdded(validatedNode);
+ }
+
+ @Override
+ public void onNodeJoined(LogicalNode joinedNode,
LogicalTopologySnapshot newTopology) {
+ LOG.info("Node (awaited by Metastorage repair) has joined the
cluster: {}", joinedNode.name());
+
+ markNodeAsAdded(joinedNode);
+ }
+
+ private void markNodeAsAdded(LogicalNode validatedNode) {
cumulativeValidatedNodeNames.add(validatedNode.name());
if (isSuperset(cumulativeValidatedNodeNames, nodeNames)) {
@@ -110,6 +124,8 @@ public class MetastorageRepairImpl implements
MetastorageRepair {
@Override
public void onNodeInvalidated(LogicalNode invalidatedNode) {
+ LOG.info("Node (awaited by Metastorage repair) has been
invalidated in CMG: {}", invalidatedNode.name());
+
cumulativeValidatedNodeNames.remove(invalidatedNode.name());
}
};
@@ -121,6 +137,9 @@ public class MetastorageRepairImpl implements
MetastorageRepair {
Set<String> validatedNodeNames = validatedNodes.stream()
.map(ClusterNode::name)
.collect(toSet());
+
+ LOG.info("Nodes (awaited by Metastorage repair) that are
currently validated/joined in CMG: {}", validatedNodeNames);
+
if (isSuperset(validatedNodeNames, nodeNames)) {
future.complete(null);
}
@@ -144,6 +163,8 @@ public class MetastorageRepairImpl implements
MetastorageRepair {
}
private CompletableFuture<Map<String, IndexWithTerm>>
startMetastorageRepair(Set<String> participatingNodeNames) {
+ LOG.info("Sending StartMetastorageRepair requests to {}",
participatingNodeNames);
+
StartMetastorageRepairRequest request =
messagesFactory.startMetastorageRepairRequest().build();
Map<String, CompletableFuture<StartMetastorageRepairResponse>>
responses = new HashMap<>();
diff --git
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
index 930f54b311..04da8bcce9 100644
---
a/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
+++
b/modules/system-disaster-recovery/src/test/java/org/apache/ignite/internal/disaster/system/MetastorageRepairImplTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessage
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import
org.apache.ignite.internal.disaster.system.message.BecomeMetastorageLeaderMessage;
import
org.apache.ignite.internal.disaster.system.message.StartMetastorageRepairRequest;
import
org.apache.ignite.internal.disaster.system.message.StartMetastorageRepairResponse;
@@ -157,7 +158,7 @@ class MetastorageRepairImplTest extends
BaseIgniteAbstractTest {
}
@Test
- void proceedsIfParticipatingNodesAppearLaterThanRepairStarts() {
+ void proceedsIfParticipatingNodesAppearAsValidatedLaterThanRepairStarts() {
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
doAnswer(invocation -> {
LogicalTopologyEventListener listener = invocation.getArgument(0);
@@ -173,6 +174,26 @@ class MetastorageRepairImplTest extends
BaseIgniteAbstractTest {
assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3,
SECONDS));
}
+ @Test
+ void proceedsIfParticipatingNodesAppearAsJoinedLaterThanRepairStarts() {
+
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of()));
+ doAnswer(invocation -> {
+ LogicalTopologyEventListener listener = invocation.getArgument(0);
+
+ LogicalNode joinedNode1 = new LogicalNode(node1);
+ LogicalNode joinedNode2 = new LogicalNode(node2);
+
+ listener.onNodeJoined(joinedNode1, new LogicalTopologySnapshot(1,
Set.of(joinedNode1)));
+ listener.onNodeJoined(joinedNode2, new LogicalTopologySnapshot(2,
Set.of(joinedNode1, joinedNode2)));
+
+ return null;
+ }).when(logicalTopology).addEventListener(any());
+
+ willRespondWithIndexAndTerm(node1, 10, 1);
+
+ assertThat(repair.repair(Set.of(node1.name()), 1), willSucceedIn(3,
SECONDS));
+ }
+
@Test
void waitsTillEveryNodeResponds() {
when(cmgManager.validatedNodes()).thenReturn(completedFuture(Set.of(node1,
node2)));