This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8bccac1924 IGNITE-22688 Make async continuation executor configurable
in IgniteServer (#5044)
8bccac1924 is described below
commit 8bccac192403661741766b345201f49174890b23
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jan 17 17:23:45 2025 +0400
IGNITE-22688 Make async continuation executor configurable in IgniteServer
(#5044)
---
.../catalog/ItCatalogApiThreadingTest.java | 4 +-
.../internal/runner/app/ItIgniteServerTest.java | 76 ++++++++++++-
.../main/java/org/apache/ignite/IgniteServer.java | 120 ++++++++++++++-------
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../ignite/internal/app/IgniteServerImpl.java | 31 +++---
5 files changed, 172 insertions(+), 61 deletions(-)
diff --git
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
index e95c83af15..e712c3dabf 100644
---
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
+++
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
@@ -84,8 +84,8 @@ class ItCatalogApiThreadingTest extends
ClusterPerClassIntegrationTest {
@ParameterizedTest
@EnumSource(CatalogAsyncOperation.class)
- @Disabled("IGNITE-22687 or IGNITE-22688")
- // TODO: enable this after IGNITE-22687 is fixed or after IGNITE-22688
(which will give possibility to distinguish the common FJP from
+ @Disabled("IGNITE-22687 or IGNITE-24204")
+ // TODO: enable this after IGNITE-22687 is fixed or after IGNITE-24204
(which will give possibility to distinguish the common FJP from
// the user-supplied async continuation executor).
void
catalogFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(CatalogAsyncOperation
operation) {
CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
index 4564440a66..158ce22cbe 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
@@ -45,6 +47,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* IgniteServer interface tests.
@@ -133,7 +137,7 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
@Test
void testNodesStartWithBootstrapConfiguration() {
for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
- startNode(e.getKey(), name -> startNode(name, e.getValue()));
+ startAndRegisterNode(e.getKey(), name -> startNode(name,
e.getValue()));
}
assertThat(startedIgniteServers, hasSize(3));
@@ -151,7 +155,7 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
@Test
void testNodesStartWithBootstrapConfigurationInitializedCluster() {
for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
- startNode(e.getKey(), name -> startNode(name, e.getValue()));
+ startAndRegisterNode(e.getKey(), name -> startNode(name,
e.getValue()));
}
assertThat(startedIgniteServers, hasSize(3));
@@ -187,11 +191,32 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
);
}
- private void startNode(String nodeName, Function<String, IgniteServer>
starter) {
+ @ParameterizedTest
+ @EnumSource
+ void differentStartKindsWork(StartKind startKind) {
+ for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
+ startAndRegisterNode(e.getKey(), name -> startNode(name,
e.getValue(), startKind));
+ }
+
+ assertThat(startedIgniteServers, hasSize(3));
+
+ IgniteServer igniteServer = startedIgniteServers.get(0);
+ InitParameters initParameters = InitParameters.builder()
+ .metaStorageNodes(igniteServer)
+ .clusterName("cluster")
+ .build();
+ assertThat(igniteServer.initClusterAsync(initParameters),
willCompleteSuccessfully());
+ }
+
+ private void startAndRegisterNode(String nodeName, Function<String,
IgniteServer> starter) {
startedIgniteServers.add(starter.apply(nodeName));
}
private IgniteServer startNode(String name, String config) {
+ return startNode(name, config, IgniteServer::start);
+ }
+
+ private IgniteServer startNode(String name, String config, Starter
starter) {
Path nodeWorkDir = workDir.resolve(name);
Path configPath = nodeWorkDir.resolve("ignite-config.conf");
try {
@@ -200,7 +225,50 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
} catch (IOException ex) {
throw new RuntimeException(ex);
}
- return IgniteServer.start(name, configPath, nodeWorkDir);
+ return starter.start(name, configPath, nodeWorkDir);
+ }
+
+ @FunctionalInterface
+ private interface Starter {
+ IgniteServer start(String name, Path config, Path workDir);
}
+ enum StartKind implements Starter {
+ START {
+ @Override
+ public IgniteServer start(String name, Path config, Path workDir) {
+ return IgniteServer.start(name, config, workDir);
+ }
+ },
+ START_ASYNC_JOIN {
+ @Override
+ public IgniteServer start(String name, Path config, Path workDir) {
+ return interruptibleJoin(IgniteServer.startAsync(name, config,
workDir));
+ }
+ },
+ BUILD_START {
+ @Override
+ public IgniteServer start(String name, Path config, Path workDir) {
+ IgniteServer server = IgniteServer.builder(name, config,
workDir).build();
+ server.start();
+ return server;
+ }
+ },
+ BUILD_START_ASYNC_JOIN {
+ @Override
+ public IgniteServer start(String name, Path config, Path workDir) {
+ IgniteServer server = IgniteServer.builder(name, config,
workDir).build();
+ interruptibleJoin(server.startAsync());
+ return server;
+ }
+ };
+
+ private static <T> T interruptibleJoin(CompletableFuture<T> future) {
+ try {
+ return future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
b/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
index e6c79d1574..c7098d822a 100644
--- a/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
+++ b/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
@@ -19,6 +19,8 @@ package org.apache.ignite;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.internal.app.IgniteServerImpl;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -43,33 +45,21 @@ public interface IgniteServer {
* endpoint is functional).
*/
static CompletableFuture<IgniteServer> startAsync(String nodeName, Path
configPath, Path workDir) {
- return startAsync(nodeName, configPath, workDir, null);
+ IgniteServer server = builder(nodeName, configPath, workDir).build();
+ return server.startAsync().thenApply(unused -> server);
}
/**
- * Starts an embedded Ignite node with a configuration from a HOCON file
with an optional class loader for further usage by
- * {@link java.util.ServiceLoader}..
+ * Starts the node.
*
- * <p>When the future returned from this method completes, the node is
partially started, and is ready to accept the init command (that
- * is, its REST endpoint is functional).
+ * <p>When the returned future completes, the node is partially started
and ready to accept the init command (that is, its
+ * REST endpoint is functional).
*
- * @param nodeName Name of the node. Must not be {@code null}.
- * @param configPath Path to the node configuration in the HOCON format.
Must not be {@code null}.
- * @param workDir Work directory for the node. Must not be {@code null}.
- * @param serviceLoaderClassLoader The class loader to be used to load
provider-configuration files and provider classes, or
- * {@code null} if the system class loader (or, failing that, the
bootstrap class loader) is to be used
- * @return Future that will be completed when the node is partially
started, and is ready to accept the init command (that is, its REST
- * endpoint is functional).
+ * <p>Start can only be called once.
+ *
+ * @return Future that will be completed when the node is started.
*/
- static CompletableFuture<IgniteServer> startAsync(
- String nodeName,
- Path configPath,
- Path workDir,
- @Nullable ClassLoader serviceLoaderClassLoader
- ) {
- IgniteServerImpl embeddedNode = new IgniteServerImpl(nodeName,
configPath, workDir, serviceLoaderClassLoader);
- return embeddedNode.startAsync().thenApply(unused -> embeddedNode);
- }
+ CompletableFuture<Void> startAsync();
/**
* Starts an embedded Ignite node with a configuration from a HOCON file
synchronously.
@@ -83,31 +73,31 @@ public interface IgniteServer {
* @return Node instance.
*/
static IgniteServer start(String nodeName, Path configPath, Path workDir) {
- return start(nodeName, configPath, workDir, null);
+ IgniteServer server = builder(nodeName, configPath, workDir).build();
+ server.start();
+ return server;
}
/**
- * Starts an embedded Ignite node with a configuration from a HOCON file,
with an optional class loader for further usage by
- * {@link java.util.ServiceLoader} synchronously.
+ * Starts the node.
*
- * <p>When this method returns, the node is partially started, and is
ready to accept the init command (that is, its REST endpoint is
- * functional).
+ * <p>When this method returns, the node is partially started and ready to
accept the init command (that is, its
+ * REST endpoint is functional).
+ *
+ * <p>Start can only be called once.
+ */
+ void start();
+
+ /**
+ * Returns a builder for an embedded Ignite node.
*
* @param nodeName Name of the node. Must not be {@code null}.
* @param configPath Path to the node configuration in the HOCON format.
Must not be {@code null}.
* @param workDir Work directory for the node. Must not be {@code null}.
- * @param serviceLoaderClassLoader The class loader to be used to load
provider-configuration files and provider classes, or
- * {@code null} if the system class loader (or, failing that, the
bootstrap class loader) is to be used
+ * @return Node instance.
*/
- static IgniteServer start(
- String nodeName,
- Path configPath,
- Path workDir,
- @Nullable ClassLoader serviceLoaderClassLoader
- ) {
- IgniteServerImpl embeddedNode = new IgniteServerImpl(nodeName,
configPath, workDir, serviceLoaderClassLoader);
- embeddedNode.start();
- return embeddedNode;
+ static Builder builder(String nodeName, Path configPath, Path workDir) {
+ return new Builder(nodeName, configPath, workDir);
}
/**
@@ -191,4 +181,60 @@ public interface IgniteServer {
* @return Node name.
*/
String name();
+
+ /**
+ * Builder for IgniteServer.
+ */
+ final class Builder {
+ private final String nodeName;
+ private final Path configPath;
+ private final Path workDir;
+
+ private @Nullable ClassLoader serviceLoaderClassLoader;
+ private Executor asyncContinuationExecutor = ForkJoinPool.commonPool();
+
+ private Builder(String nodeName, Path configPath, Path workDir) {
+ this.nodeName = nodeName;
+ this.configPath = configPath;
+ this.workDir = workDir;
+ }
+
+ /**
+ * Specifies class loader to use when loading components via {@link
java.util.ServiceLoader}.
+ *
+ * @param serviceLoaderClassLoader The class loader to be used to load
provider-configuration files and provider classes, or
+ * {@code null} if the system class loader (or, failing that,
the bootstrap class loader) is to be used
+ * @return This instance for chaining.
+ */
+ public Builder serviceLoaderClassLoader(@Nullable ClassLoader
serviceLoaderClassLoader) {
+ this.serviceLoaderClassLoader = serviceLoaderClassLoader;
+ return this;
+ }
+
+ /**
+ * Specifies executor in which futures obtained via API will be
completed..
+ *
+ * @param asyncContinuationExecutor Executor in which futures obtained
via API will be completed.
+ * @return This instance for chaining.
+ */
+ public Builder asyncContinuationExecutor(Executor
asyncContinuationExecutor) {
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ return this;
+ }
+
+ /**
+ * Builds an IgniteServer. It is not started; {@link #start()} or
{@link #startAsync()} can be used to start it.
+ *
+ * @return Server instance.
+ */
+ public IgniteServer build() {
+ return new IgniteServerImpl(
+ nodeName,
+ configPath,
+ workDir,
+ serviceLoaderClassLoader,
+ asyncContinuationExecutor
+ );
+ }
+ }
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a201512078..a45deb4825 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1291,7 +1291,7 @@ public class IgniteImpl implements Ignite {
/**
* Starts ignite node.
*
- * <p>When this method returns, the node is partially started and ready to
accept the init command (that is, its
+ * <p>When the returned future completes, the node is partially started
and ready to accept the init command (that is, its
* REST endpoint is functional).
*
* @return Future that will be completed when the node is started.
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index ddd85812df..d0832e1e78 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -35,7 +35,6 @@ import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
@@ -141,8 +140,15 @@ public class IgniteServerImpl implements IgniteServer {
* @param workDir Work directory for the started node. Must not be {@code
null}.
* @param classLoader The class loader to be used to load
provider-configuration files and provider classes, or {@code null} if
* the system class loader (or, failing that, the bootstrap class
loader) is to be used
+ * @param asyncContinuationExecutor Executor in which user-facing futures
will be completed.
*/
- public IgniteServerImpl(String nodeName, Path configPath, Path workDir,
@Nullable ClassLoader classLoader) {
+ public IgniteServerImpl(
+ String nodeName,
+ Path configPath,
+ Path workDir,
+ @Nullable ClassLoader classLoader,
+ Executor asyncContinuationExecutor
+ ) {
if (nodeName == null) {
throw new NodeStartException("Node name must not be null");
}
@@ -158,13 +164,16 @@ public class IgniteServerImpl implements IgniteServer {
if (workDir == null) {
throw new NodeStartException("Working directory must not be null");
}
+ if (asyncContinuationExecutor == null) {
+ throw new NodeStartException("Async continuation executor must not
be null");
+ }
this.nodeName = nodeName;
this.configPath = configPath;
this.workDir = workDir;
this.classLoader = classLoader;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
- asyncContinuationExecutor = ForkJoinPool.commonPool();
attachmentLock = new IgniteAttachmentLock(() -> ignite,
asyncContinuationExecutor);
publicIgnite = new RestartProofIgnite(attachmentLock);
}
@@ -364,14 +373,7 @@ public class IgniteServerImpl implements IgniteServer {
return nodeName;
}
- /**
- * Starts ignite node.
- *
- * <p>When this method returns, the node is partially started and ready to
accept the init command (that is, its
- * REST endpoint is functional).
- *
- * @return Future that will be completed when the node is started.
- */
+ @Override
public CompletableFuture<Void> startAsync() {
if (ignite != null) {
throw new NodeStartException("Node is already started.");
@@ -411,12 +413,7 @@ public class IgniteServerImpl implements IgniteServer {
}).thenCompose(identity());
}
- /**
- * Starts ignite node.
- *
- * <p>When this method returns, the node is partially started and ready to
accept the init command (that is, its
- * REST endpoint is functional).
- */
+ @Override
public void start() {
sync(startAsync());
}