This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0eb8ea4ba1 IGNITE-22783 Handle CMG-related network messages in CMG
thread pool (#4467)
0eb8ea4ba1 is described below
commit 0eb8ea4ba14665f40028522970e612a74b25514d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Sep 27 19:15:31 2024 +0400
IGNITE-22783 Handle CMG-related network messages in CMG thread pool (#4467)
---
.../management/ClusterManagementGroupManager.java | 2 +-
.../disaster/system/ItCmgDisasterRecoveryTest.java | 2 +-
.../system/ItSystemGroupDisasterRecoveryTest.java | 17 +++++++++++++++--
3 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 08f6250118..a7350f9c91 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -198,7 +198,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
cmgMessageHandler = createMessageHandler();
-
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class,
cmgMessageHandler);
+
clusterService.messagingService().addMessageHandler(CmgMessageGroup.class,
message -> scheduledExecutor, cmgMessageHandler);
}
private CmgMessageHandler createMessageHandler() {
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
index 8e667ee3a9..a96711ce5e 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItCmgDisasterRecoveryTest.java
@@ -83,7 +83,7 @@ class ItCmgDisasterRecoveryTest extends
ItSystemGroupDisasterRecoveryTest {
}
private void initiateCmgRepairVia(IgniteImpl conductor, int...
newCmgIndexes) throws InterruptedException {
- NodeMetadata nodeMetadata = conductor.node().nodeMetadata();
+ NodeMetadata nodeMetadata = obtainNodeMetadata(conductor);
recoveryClient.initiateCmgRepair(nodeMetadata.restHost(),
nodeMetadata.httpPort(), nodeNames(newCmgIndexes));
}
diff --git
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
index a00573cf38..0739d76592 100644
---
a/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
+++
b/modules/system-disaster-recovery/src/integrationTest/java/org/apache/ignite/internal/disaster/system/ItSystemGroupDisasterRecoveryTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.disaster.system;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -34,6 +35,7 @@ import
org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.app.IgniteServerImpl;
import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NodeMetadata;
import org.jetbrains.annotations.Nullable;
@@ -118,8 +120,8 @@ abstract class ItSystemGroupDisasterRecoveryTest extends
ClusterPerTestIntegrati
}
void initiateMigrationToNewCluster(IgniteImpl nodeMissingRepair,
IgniteImpl repairedNode) throws Exception {
- NodeMetadata missingRepairMetadata =
nodeMissingRepair.node().nodeMetadata();
- NodeMetadata repairedMetadata = repairedNode.node().nodeMetadata();
+ NodeMetadata missingRepairMetadata =
obtainNodeMetadata(nodeMissingRepair);
+ NodeMetadata repairedMetadata = obtainNodeMetadata(repairedNode);
recoveryClient.initiateMigration(
missingRepairMetadata.restHost(),
@@ -128,4 +130,15 @@ abstract class ItSystemGroupDisasterRecoveryTest extends
ClusterPerTestIntegrati
repairedMetadata.httpPort()
);
}
+
+ static NodeMetadata obtainNodeMetadata(IgniteImpl ignite) throws
InterruptedException {
+ ClusterNode clusterNode = ignite.node();
+
+ assertTrue(
+ waitForCondition(() -> clusterNode.nodeMetadata() != null,
SECONDS.toMillis(10)),
+ "Did not see " + ignite.name() + " to get metadata in time"
+ );
+
+ return requireNonNull(clusterNode.nodeMetadata());
+ }
}