This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0eb8ea4ba1 IGNITE-22783 Handle CMG-related network messages in CMG 
thread pool (#4467)
0eb8ea4ba1 is described below

commit 0eb8ea4ba14665f40028522970e612a74b25514d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 27 19:15:31 2024 +0400

    IGNITE-22783 Handle CMG-related network messages in CMG thread pool (#4467)
---
 .../management/ClusterManagementGroupManager.java       |  2 +-
 .../disaster/system/ItCmgDisasterRecoveryTest.java      |  2 +-
 .../system/ItSystemGroupDisasterRecoveryTest.java       | 17 +++++++++++++++--
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 08f6250118..a7350f9c91 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -198,7 +198,7 @@ public class ClusterManagementGroupManager extends 
AbstractEventProducer<Cluster
 
         cmgMessageHandler = createMessageHandler();
 
-        
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class, 
cmgMessageHandler);
+        
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class, 
message -> scheduledExecutor, cmgMessageHandler);
     }
 
     private CmgMessageHandler createMessageHandler() {
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 8e667ee3a9..a96711ce5e 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -83,7 +83,7 @@ class ItCmgDisasterRecoveryTest extends 
ItSystemGroupDisasterRecoveryTest {
     }
 
     private void initiateCmgRepairVia(IgniteImpl conductor, int... 
newCmgIndexes) throws InterruptedException {
-        NodeMetadata nodeMetadata = conductor.node().nodeMetadata();
+        NodeMetadata nodeMetadata = obtainNodeMetadata(conductor);
 
         recoveryClient.initiateCmgRepair(nodeMetadata.restHost(), 
nodeMetadata.httpPort(), nodeNames(newCmgIndexes));
     }
diff --git 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index a00573cf38..0739d76592 100644
--- 
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++ 
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.disaster.system;
 
+import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -34,6 +35,7 @@ import 
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.app.IgniteServerImpl;
 import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NodeMetadata;
 import org.jetbrains.annotations.Nullable;
 
@@ -118,8 +120,8 @@ abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrati
     }
 
     void initiateMigrationToNewCluster(IgniteImpl nodeMissingRepair, 
IgniteImpl repairedNode) throws Exception {
-        NodeMetadata missingRepairMetadata = 
nodeMissingRepair.node().nodeMetadata();
-        NodeMetadata repairedMetadata = repairedNode.node().nodeMetadata();
+        NodeMetadata missingRepairMetadata = 
obtainNodeMetadata(nodeMissingRepair);
+        NodeMetadata repairedMetadata = obtainNodeMetadata(repairedNode);
 
         recoveryClient.initiateMigration(
                 missingRepairMetadata.restHost(),
@@ -128,4 +130,15 @@ abstract class ItSystemGroupDisasterRecoveryTest extends 
ClusterPerTestIntegrati
                 repairedMetadata.httpPort()
         );
     }
+
+    static NodeMetadata obtainNodeMetadata(IgniteImpl ignite) throws 
InterruptedException {
+        ClusterNode clusterNode = ignite.node();
+
+        assertTrue(
+                waitForCondition(() -> clusterNode.nodeMetadata() != null, 
SECONDS.toMillis(10)),
+                "Did not see " + ignite.name() + " to get metadata in time"
+        );
+
+        return requireNonNull(clusterNode.nodeMetadata());
+    }
 }

Reply via email to