This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8bccac1924 IGNITE-22688 Make async continuation executor configurable 
in IgniteServer (#5044)
8bccac1924 is described below

commit 8bccac192403661741766b345201f49174890b23
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jan 17 17:23:45 2025 +0400

    IGNITE-22688 Make async continuation executor configurable in IgniteServer 
(#5044)
---
 .../catalog/ItCatalogApiThreadingTest.java         |   4 +-
 .../internal/runner/app/ItIgniteServerTest.java    |  76 ++++++++++++-
 .../main/java/org/apache/ignite/IgniteServer.java  | 120 ++++++++++++++-------
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +-
 .../ignite/internal/app/IgniteServerImpl.java      |  31 +++---
 5 files changed, 172 insertions(+), 61 deletions(-)

diff --git 
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
 
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
index e95c83af15..e712c3dabf 100644
--- 
a/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
+++ 
b/modules/catalog-dsl/src/integrationTest/java/org/apache/ignite/internal/catalog/ItCatalogApiThreadingTest.java
@@ -84,8 +84,8 @@ class ItCatalogApiThreadingTest extends 
ClusterPerClassIntegrationTest {
 
     @ParameterizedTest
     @EnumSource(CatalogAsyncOperation.class)
-    @Disabled("IGNITE-22687 or IGNITE-22688")
-    // TODO: enable this after IGNITE-22687 is fixed or after IGNITE-22688 
(which will give possibility to distinguish the common FJP from
+    @Disabled("IGNITE-22687 or IGNITE-24204")
+    // TODO: enable this after IGNITE-22687 is fixed or after IGNITE-24204 
(which will give possibility to distinguish the common FJP from
     // the user-supplied async continuation executor).
     void 
catalogFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(CatalogAsyncOperation
 operation) {
         CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
index 4564440a66..158ce22cbe 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteServerTest.java
@@ -31,6 +31,8 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import org.apache.ignite.IgniteServer;
 import org.apache.ignite.InitParameters;
@@ -45,6 +47,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * IgniteServer interface tests.
@@ -133,7 +137,7 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
     @Test
     void testNodesStartWithBootstrapConfiguration() {
         for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
-            startNode(e.getKey(), name -> startNode(name, e.getValue()));
+            startAndRegisterNode(e.getKey(), name -> startNode(name, 
e.getValue()));
         }
 
         assertThat(startedIgniteServers, hasSize(3));
@@ -151,7 +155,7 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
     @Test
     void testNodesStartWithBootstrapConfigurationInitializedCluster() {
         for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
-            startNode(e.getKey(), name -> startNode(name, e.getValue()));
+            startAndRegisterNode(e.getKey(), name -> startNode(name, 
e.getValue()));
         }
 
         assertThat(startedIgniteServers, hasSize(3));
@@ -187,11 +191,32 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
         );
     }
 
-    private void startNode(String nodeName, Function<String, IgniteServer> 
starter) {
+    @ParameterizedTest
+    @EnumSource
+    void differentStartKindsWork(StartKind startKind) {
+        for (Map.Entry<String, String> e : nodesBootstrapCfg.entrySet()) {
+            startAndRegisterNode(e.getKey(), name -> startNode(name, 
e.getValue(), startKind));
+        }
+
+        assertThat(startedIgniteServers, hasSize(3));
+
+        IgniteServer igniteServer = startedIgniteServers.get(0);
+        InitParameters initParameters = InitParameters.builder()
+                .metaStorageNodes(igniteServer)
+                .clusterName("cluster")
+                .build();
+        assertThat(igniteServer.initClusterAsync(initParameters), 
willCompleteSuccessfully());
+    }
+
+    private void startAndRegisterNode(String nodeName, Function<String, 
IgniteServer> starter) {
         startedIgniteServers.add(starter.apply(nodeName));
     }
 
     private IgniteServer startNode(String name, String config) {
+        return startNode(name, config, IgniteServer::start);
+    }
+
+    private IgniteServer startNode(String name, String config, Starter 
starter) {
         Path nodeWorkDir = workDir.resolve(name);
         Path configPath = nodeWorkDir.resolve("ignite-config.conf");
         try {
@@ -200,7 +225,50 @@ class ItIgniteServerTest extends BaseIgniteAbstractTest {
         } catch (IOException ex) {
             throw new RuntimeException(ex);
         }
-        return IgniteServer.start(name, configPath, nodeWorkDir);
+        return starter.start(name, configPath, nodeWorkDir);
+    }
+
+    @FunctionalInterface
+    private interface Starter {
+        IgniteServer start(String name, Path config, Path workDir);
     }
 
+    enum StartKind implements Starter {
+        START {
+            @Override
+            public IgniteServer start(String name, Path config, Path workDir) {
+                return IgniteServer.start(name, config, workDir);
+            }
+        },
+        START_ASYNC_JOIN {
+            @Override
+            public IgniteServer start(String name, Path config, Path workDir) {
+                return interruptibleJoin(IgniteServer.startAsync(name, config, 
workDir));
+            }
+        },
+        BUILD_START {
+            @Override
+            public IgniteServer start(String name, Path config, Path workDir) {
+                IgniteServer server = IgniteServer.builder(name, config, 
workDir).build();
+                server.start();
+                return server;
+            }
+        },
+        BUILD_START_ASYNC_JOIN {
+            @Override
+            public IgniteServer start(String name, Path config, Path workDir) {
+                IgniteServer server = IgniteServer.builder(name, config, 
workDir).build();
+                interruptibleJoin(server.startAsync());
+                return server;
+            }
+        };
+
+        private static <T> T interruptibleJoin(CompletableFuture<T> future) {
+            try {
+                return future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }
diff --git a/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java 
b/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
index e6c79d1574..c7098d822a 100644
--- a/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
+++ b/modules/runner/src/main/java/org/apache/ignite/IgniteServer.java
@@ -19,6 +19,8 @@ package org.apache.ignite;
 
 import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.internal.app.IgniteServerImpl;
 import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
@@ -43,33 +45,21 @@ public interface IgniteServer {
      *         endpoint is functional).
      */
     static CompletableFuture<IgniteServer> startAsync(String nodeName, Path 
configPath, Path workDir) {
-        return startAsync(nodeName, configPath, workDir, null);
+        IgniteServer server = builder(nodeName, configPath, workDir).build();
+        return server.startAsync().thenApply(unused -> server);
     }
 
     /**
-     * Starts an embedded Ignite node with a configuration from a HOCON file 
with an optional class loader for further usage by
-     * {@link java.util.ServiceLoader}..
+     * Starts the node.
      *
-     * <p>When the future returned from this method completes, the node is 
partially started, and is ready to accept the init command (that
-     * is, its REST endpoint is functional).
+     * <p>When the returned future completes, the node is partially started 
and ready to accept the init command (that is, its
+     * REST endpoint is functional).
      *
-     * @param nodeName Name of the node. Must not be {@code null}.
-     * @param configPath Path to the node configuration in the HOCON format. 
Must not be {@code null}.
-     * @param workDir Work directory for the node. Must not be {@code null}.
-     * @param serviceLoaderClassLoader The class loader to be used to load 
provider-configuration files and provider classes, or
-     *         {@code null} if the system class loader (or, failing that, the 
bootstrap class loader) is to be used
-     * @return Future that will be completed when the node is partially 
started, and is ready to accept the init command (that is, its REST
-     *         endpoint is functional).
+     * <p>Start can only be called once.
+     *
+     * @return Future that will be completed when the node is started.
      */
-    static CompletableFuture<IgniteServer> startAsync(
-            String nodeName,
-            Path configPath,
-            Path workDir,
-            @Nullable ClassLoader serviceLoaderClassLoader
-    ) {
-        IgniteServerImpl embeddedNode = new IgniteServerImpl(nodeName, 
configPath, workDir, serviceLoaderClassLoader);
-        return embeddedNode.startAsync().thenApply(unused -> embeddedNode);
-    }
+    CompletableFuture<Void> startAsync();
 
     /**
      * Starts an embedded Ignite node with a configuration from a HOCON file 
synchronously.
@@ -83,31 +73,31 @@ public interface IgniteServer {
      * @return Node instance.
      */
     static IgniteServer start(String nodeName, Path configPath, Path workDir) {
-        return start(nodeName, configPath, workDir, null);
+        IgniteServer server = builder(nodeName, configPath, workDir).build();
+        server.start();
+        return server;
     }
 
     /**
-     * Starts an embedded Ignite node with a configuration from a HOCON file, 
with an optional class loader for further usage by
-     * {@link java.util.ServiceLoader} synchronously.
+     * Starts the node.
      *
-     * <p>When this method returns, the node is partially started, and is 
ready to accept the init command (that is, its REST endpoint is
-     * functional).
+     * <p>When this method returns, the node is partially started and ready to 
accept the init command (that is, its
+     * REST endpoint is functional).
+     *
+     * <p>Start can only be called once.
+     */
+    void start();
+
+    /**
+     * Returns a builder for an embedded Ignite node.
      *
      * @param nodeName Name of the node. Must not be {@code null}.
      * @param configPath Path to the node configuration in the HOCON format. 
Must not be {@code null}.
      * @param workDir Work directory for the node. Must not be {@code null}.
-     * @param serviceLoaderClassLoader The class loader to be used to load 
provider-configuration files and provider classes, or
-     *         {@code null} if the system class loader (or, failing that, the 
bootstrap class loader) is to be used
+     * @return Node instance.
      */
-    static IgniteServer start(
-            String nodeName,
-            Path configPath,
-            Path workDir,
-            @Nullable ClassLoader serviceLoaderClassLoader
-    ) {
-        IgniteServerImpl embeddedNode = new IgniteServerImpl(nodeName, 
configPath, workDir, serviceLoaderClassLoader);
-        embeddedNode.start();
-        return embeddedNode;
+    static Builder builder(String nodeName, Path configPath, Path workDir) {
+        return new Builder(nodeName, configPath, workDir);
     }
 
     /**
@@ -191,4 +181,60 @@ public interface IgniteServer {
      * @return Node name.
      */
     String name();
+
+    /**
+     * Builder for IgniteServer.
+     */
+    final class Builder {
+        private final String nodeName;
+        private final Path configPath;
+        private final Path workDir;
+
+        private @Nullable ClassLoader serviceLoaderClassLoader;
+        private Executor asyncContinuationExecutor = ForkJoinPool.commonPool();
+
+        private Builder(String nodeName, Path configPath, Path workDir) {
+            this.nodeName = nodeName;
+            this.configPath = configPath;
+            this.workDir = workDir;
+        }
+
+        /**
+         * Specifies class loader to use when loading components via {@link 
java.util.ServiceLoader}.
+         *
+         * @param serviceLoaderClassLoader The class loader to be used to load 
provider-configuration files and provider classes, or
+         *         {@code null} if the system class loader (or, failing that, 
the bootstrap class loader) is to be used
+         * @return This instance for chaining.
+         */
+        public Builder serviceLoaderClassLoader(@Nullable ClassLoader 
serviceLoaderClassLoader) {
+            this.serviceLoaderClassLoader = serviceLoaderClassLoader;
+            return this;
+        }
+
+        /**
+         * Specifies executor in which futures obtained via API will be 
completed..
+         *
+         * @param asyncContinuationExecutor Executor in which futures obtained 
via API will be completed.
+         * @return This instance for chaining.
+         */
+        public Builder asyncContinuationExecutor(Executor 
asyncContinuationExecutor) {
+            this.asyncContinuationExecutor = asyncContinuationExecutor;
+            return this;
+        }
+
+        /**
+         * Builds an IgniteServer. It is not started; {@link #start()} or 
{@link #startAsync()} can be used to start it.
+         *
+         * @return Server instance.
+         */
+        public IgniteServer build() {
+            return new IgniteServerImpl(
+                    nodeName,
+                    configPath,
+                    workDir,
+                    serviceLoaderClassLoader,
+                    asyncContinuationExecutor
+            );
+        }
+    }
 }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a201512078..a45deb4825 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1291,7 +1291,7 @@ public class IgniteImpl implements Ignite {
     /**
      * Starts ignite node.
      *
-     * <p>When this method returns, the node is partially started and ready to 
accept the init command (that is, its
+     * <p>When the returned future completes, the node is partially started 
and ready to accept the init command (that is, its
      * REST endpoint is functional).
      *
      * @return Future that will be completed when the node is started.
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
index ddd85812df..d0832e1e78 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteServerImpl.java
@@ -35,7 +35,6 @@ import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteServer;
@@ -141,8 +140,15 @@ public class IgniteServerImpl implements IgniteServer {
      * @param workDir Work directory for the started node. Must not be {@code 
null}.
      * @param classLoader The class loader to be used to load 
provider-configuration files and provider classes, or {@code null} if
      *         the system class loader (or, failing that, the bootstrap class 
loader) is to be used
+     * @param asyncContinuationExecutor Executor in which user-facing futures 
will be completed.
      */
-    public IgniteServerImpl(String nodeName, Path configPath, Path workDir, 
@Nullable ClassLoader classLoader) {
+    public IgniteServerImpl(
+            String nodeName,
+            Path configPath,
+            Path workDir,
+            @Nullable ClassLoader classLoader,
+            Executor asyncContinuationExecutor
+    ) {
         if (nodeName == null) {
             throw new NodeStartException("Node name must not be null");
         }
@@ -158,13 +164,16 @@ public class IgniteServerImpl implements IgniteServer {
         if (workDir == null) {
             throw new NodeStartException("Working directory must not be null");
         }
+        if (asyncContinuationExecutor == null) {
+            throw new NodeStartException("Async continuation executor must not 
be null");
+        }
 
         this.nodeName = nodeName;
         this.configPath = configPath;
         this.workDir = workDir;
         this.classLoader = classLoader;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
 
-        asyncContinuationExecutor = ForkJoinPool.commonPool();
         attachmentLock = new IgniteAttachmentLock(() -> ignite, 
asyncContinuationExecutor);
         publicIgnite = new RestartProofIgnite(attachmentLock);
     }
@@ -364,14 +373,7 @@ public class IgniteServerImpl implements IgniteServer {
         return nodeName;
     }
 
-    /**
-     * Starts ignite node.
-     *
-     * <p>When this method returns, the node is partially started and ready to 
accept the init command (that is, its
-     * REST endpoint is functional).
-     *
-     * @return Future that will be completed when the node is started.
-     */
+    @Override
     public CompletableFuture<Void> startAsync() {
         if (ignite != null) {
             throw new NodeStartException("Node is already started.");
@@ -411,12 +413,7 @@ public class IgniteServerImpl implements IgniteServer {
         }).thenCompose(identity());
     }
 
-    /**
-     * Starts ignite node.
-     *
-     * <p>When this method returns, the node is partially started and ready to 
accept the init command (that is, its
-     * REST endpoint is functional).
-     */
+    @Override
     public void start() {
         sync(startAsync());
     }

Reply via email to