This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d32e51bcbc IGNITE-23029 Cluster#startAndInit() should wait for node
registration (#4249)
d32e51bcbc is described below
commit d32e51bcbcac4d86272bfec0157d5a5e8e162003
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Aug 20 15:19:37 2024 +0400
IGNITE-23029 Cluster#startAndInit() should wait for node registration
(#4249)
---
.../ItSqlReplCommandNotInitialedClusterTest.java | 6 +-
.../management/topology/ItLogicalTopologyTest.java | 2 +-
.../failure/handlers/FailureHandlerTest.java | 7 ++-
.../apache/ignite/internal/start/ItStartTest.java | 2 +-
.../java/org/apache/ignite/internal/Cluster.java | 71 ++++++++++++++--------
.../org/apache/ignite/internal/ssl/ItSslTest.java | 14 ++---
6 files changed, 60 insertions(+), 42 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlReplCommandNotInitialedClusterTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlReplCommandNotInitialedClusterTest.java
index f91675c0b0..716d449084 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlReplCommandNotInitialedClusterTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/ItSqlReplCommandNotInitialedClusterTest.java
@@ -61,9 +61,9 @@ public class ItSqlReplCommandNotInitialedClusterTest extends
CliIntegrationTest
() -> assertErrOutputContains("Connection refused:")
);
- IgniteServer node0 = CLUSTER.startEmbeddedNode(0);
- IgniteServer node1 = CLUSTER.startEmbeddedNode(1);
- IgniteServer node2 = CLUSTER.startEmbeddedNode(2);
+ IgniteServer node0 = CLUSTER.startEmbeddedNode(0).server();
+ IgniteServer node1 = CLUSTER.startEmbeddedNode(1).server();
+ IgniteServer node2 = CLUSTER.startEmbeddedNode(2).server();
session.onConnect(SessionInfo
.builder()
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
index fc76add454..87905f58fa 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
@@ -331,7 +331,7 @@ class ItLogicalTopologyTest extends
ClusterPerTestIntegrationTest {
}
});
- IgniteServer node = cluster.startEmbeddedNode(1);
+ IgniteServer node = cluster.startEmbeddedNode(1).server();
try {
Event event = events.poll(10, TimeUnit.SECONDS);
diff --git
a/modules/failure-handler/src/integrationTest/java/org/apache/ignite/internal/failure/handlers/FailureHandlerTest.java
b/modules/failure-handler/src/integrationTest/java/org/apache/ignite/internal/failure/handlers/FailureHandlerTest.java
index eaa75aa7bd..ae36bb0086 100755
---
a/modules/failure-handler/src/integrationTest/java/org/apache/ignite/internal/failure/handlers/FailureHandlerTest.java
+++
b/modules/failure-handler/src/integrationTest/java/org/apache/ignite/internal/failure/handlers/FailureHandlerTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.ignite.IgniteServer;
+import org.apache.ignite.internal.Cluster.ServerRegistration;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureType;
@@ -54,10 +55,10 @@ public class FailureHandlerTest extends
ClusterPerTestIntegrationTest {
}
private void testFailureHandler(Function<IgniteServer, FailureHandler>
handlerFactory) {
- IgniteServer node = cluster.startEmbeddedNode(0);
- CompletableFuture<Void> fut = node.waitForInitAsync();
+ ServerRegistration registration = cluster.startEmbeddedNode(0);
+ CompletableFuture<Void> fut = registration.registrationFuture();
- FailureHandler hnd = handlerFactory.apply(node);
+ FailureHandler hnd = handlerFactory.apply(registration.server());
hnd.onFailure(
new FailureContext(
FailureType.CRITICAL_ERROR,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
index 338652b674..8915cbedeb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
@@ -135,7 +135,7 @@ class ItStartTest extends BaseIgniteAbstractTest {
AtomicReference<String> threadNameRef = new AtomicReference<>();
- IgniteServer node = cluster.startEmbeddedNode(1);
+ IgniteServer node = cluster.startEmbeddedNode(1).server();
CompletableFuture<Void> future =
node.waitForInitAsync().whenComplete((res, ex) -> {
threadNameRef.set(Thread.currentThread().getName());
});
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
index 1f3eba9b8b..e509923b63 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/Cluster.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,10 +38,9 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
@@ -217,11 +217,14 @@ public class Cluster {
initialClusterSize = nodeCount;
- List<IgniteServer> nodes = IntStream.range(0, nodeCount)
+ List<ServerRegistration> nodeRegistrations = IntStream.range(0,
nodeCount)
.mapToObj(nodeIndex -> startEmbeddedNode(nodeIndex,
nodeBootstrapConfigTemplate))
.collect(toList());
- List<IgniteServer> metaStorageAndCmgNodes =
Arrays.stream(cmgNodes).mapToObj(nodes::get).collect(toList());
+ List<IgniteServer> metaStorageAndCmgNodes = Arrays.stream(cmgNodes)
+ .mapToObj(nodeRegistrations::get)
+ .map(ServerRegistration::server)
+ .collect(toList());
InitParametersBuilder builder = InitParameters.builder()
.metaStorageNodes(metaStorageAndCmgNodes)
@@ -231,31 +234,31 @@ public class Cluster {
TestIgnitionManager.init(metaStorageAndCmgNodes.get(0),
builder.build());
- for (IgniteServer node : nodes) {
- assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+ for (ServerRegistration registration : nodeRegistrations) {
+ assertThat(registration.registrationFuture(),
willCompleteSuccessfully());
}
started = true;
}
/**
- * Starts a cluster node with the default bootstrap config template and
returns its startup future.
+ * Starts a cluster node with the default bootstrap config template.
*
* @param nodeIndex Index of the node to start.
- * @return Future that will be completed when the node starts.
+ * @return Started server and its registration future.
*/
- public IgniteServer startEmbeddedNode(int nodeIndex) {
+ public ServerRegistration startEmbeddedNode(int nodeIndex) {
return startEmbeddedNode(nodeIndex,
defaultNodeBootstrapConfigTemplate);
}
/**
- * Starts a cluster node and returns its startup future.
+ * Starts a cluster node.
*
* @param nodeIndex Index of the node to start.
* @param nodeBootstrapConfigTemplate Bootstrap config template to use for
this node.
- * @return Future that will be completed when the node starts.
+ * @return Started server and its registration future.
*/
- public IgniteServer startEmbeddedNode(int nodeIndex, String
nodeBootstrapConfigTemplate) {
+ public ServerRegistration startEmbeddedNode(int nodeIndex, String
nodeBootstrapConfigTemplate) {
String nodeName = testNodeName(testInfo, nodeIndex);
String config = IgniteStringFormatter.format(
@@ -270,7 +273,7 @@ public class Cluster {
IgniteServer node = TestIgnitionManager.start(nodeName, config,
workDir.resolve(nodeName));
setListAtIndex(igniteServers, nodeIndex, node);
- node.waitForInitAsync().thenRun(() -> {
+ CompletableFuture<Void> registrationFuture =
node.waitForInitAsync().thenRun(() -> {
synchronized (nodes) {
setListAtIndex(nodes, nodeIndex, node.api());
}
@@ -281,7 +284,8 @@ public class Cluster {
node.shutdown();
}
});
- return node;
+
+ return new ServerRegistration(node, registrationFuture);
}
private static <T> void setListAtIndex(List<T> list, int i, T element) {
@@ -345,19 +349,9 @@ public class Cluster {
* is not initialized, the node is returned in a state in which it is
ready to join the cluster).
*/
public Ignite startNode(int index, String nodeBootstrapConfigTemplate) {
- Ignite newIgniteNode;
-
- try {
- IgniteServer node = startEmbeddedNode(index,
nodeBootstrapConfigTemplate);
- node.waitForInitAsync().get(20, TimeUnit.SECONDS);
- newIgniteNode = node.api();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new RuntimeException(e);
- } catch (ExecutionException | TimeoutException e) {
- throw new RuntimeException(e);
- }
+ ServerRegistration registration = startEmbeddedNode(index,
nodeBootstrapConfigTemplate);
+ assertThat(registration.registrationFuture(), willSucceedIn(20,
TimeUnit.SECONDS));
+ Ignite newIgniteNode = registration.server().api();
assertEquals(newIgniteNode, nodes.get(index));
@@ -663,4 +657,27 @@ public class Cluster {
|| (prevPredicate != null &&
prevPredicate.test(recipientConsistentId, networkMessage));
}
}
+
+ /** {@link IgniteServer} and future that completes when the server gets
started, joins and gets registered with a Cluster. */
+ public static class ServerRegistration {
+ private final IgniteServer server;
+ private final CompletableFuture<Void> registrationFuture;
+
+ public ServerRegistration(IgniteServer server, CompletableFuture<Void>
registrationFuture) {
+ this.server = server;
+ this.registrationFuture = registrationFuture;
+ }
+
+ /** Returns IgniteServer. */
+ public IgniteServer server() {
+ return server;
+ }
+
+ /**
+ * Returns a future that gets completed when the server gets started,
joins and gets registered with the Cluster which starts it.
+ */
+ public CompletableFuture<Void> registrationFuture() {
+ return registrationFuture;
+ }
+ }
}
diff --git
a/modules/security/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
b/modules/security/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
index 74a748f9f7..35437a84ec 100644
---
a/modules/security/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
+++
b/modules/security/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
@@ -34,12 +34,12 @@ import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.SslConfiguration;
import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.Cluster.ServerRegistration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -521,20 +521,20 @@ public class ItSslTest extends BaseIgniteAbstractTest {
String sslEnabledWithCipher1BoostrapConfig =
createBoostrapConfig("TLS_AES_256_GCM_SHA384");
String sslEnabledWithCipher2BoostrapConfig =
createBoostrapConfig("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384");
- IgniteServer node1 = incompatibleTestCluster.startEmbeddedNode(10,
sslEnabledWithCipher1BoostrapConfig);
+ ServerRegistration successfulRegistration =
incompatibleTestCluster.startEmbeddedNode(10,
sslEnabledWithCipher1BoostrapConfig);
InitParameters initParameters = InitParameters.builder()
- .metaStorageNodes(node1)
+ .metaStorageNodes(successfulRegistration.server())
.clusterName("cluster")
.build();
- TestIgnitionManager.init(node1, initParameters);
+ TestIgnitionManager.init(successfulRegistration.server(),
initParameters);
// First node will initialize the cluster with single node
successfully since the second node can't connect to it.
- assertThat(node1.waitForInitAsync(), willCompleteSuccessfully());
+ assertThat(successfulRegistration.registrationFuture(),
willCompleteSuccessfully());
- IgniteServer node2 = incompatibleTestCluster.startEmbeddedNode(11,
sslEnabledWithCipher2BoostrapConfig);
- assertThat(node2.waitForInitAsync(), willTimeoutIn(1,
TimeUnit.SECONDS));
+ ServerRegistration failingRegistration =
incompatibleTestCluster.startEmbeddedNode(11,
sslEnabledWithCipher2BoostrapConfig);
+ assertThat(failingRegistration.registrationFuture(),
willTimeoutIn(1, TimeUnit.SECONDS));
} finally {
incompatibleTestCluster.shutdown();
}