This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new aa787089d7e IGNITE-27799 Remove blocking local node creation (#7569)
aa787089d7e is described below

commit aa787089d7ea4aa69b28e3740d008b6eda2d0c8b
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Feb 11 10:00:10 2026 +0200

    IGNITE-27799 Remove blocking local node creation (#7569)
---
 modules/network/build.gradle                       |   1 +
 .../internal/network/ItStaticNodeFinderTest.java   | 109 ++++++++++++++++-----
 .../network/netty/ItConnectionManagerTest.java     |  19 ++--
 .../internal/network/netty/ConnectionManager.java  |  78 +++------------
 .../ignite/internal/network/netty/NettyServer.java |  36 +++----
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  96 +++++++++---------
 .../network/DefaultMessagingServiceTest.java       |   6 +-
 .../internal/network/netty/NettyServerTest.java    |  18 +++-
 8 files changed, 185 insertions(+), 178 deletions(-)

diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index c1be1499a2e..fbfe99977b9 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -92,6 +92,7 @@ dependencies {
     integrationTestImplementation libs.netty.handler
     integrationTestImplementation libs.scalecube.cluster
     integrationTestImplementation libs.javapoet
+    integrationTestImplementation libs.typesafe.config
 }
 
 tasks.withType(Test).configureEach {
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
index 18267e7ad6f..79f6225f73d 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
@@ -17,47 +17,112 @@
 
 package org.apache.ignite.internal.network;
 
+import static 
org.apache.ignite.internal.ClusterConfiguration.DEFAULT_BASE_PORT;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Network.ADDRESS_UNRESOLVED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import com.typesafe.config.parser.ConfigDocumentFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.NodeBootstrapConfigUpdater;
+import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
-/**
- * Tests that node finder failure causes node shutdown.
- */
-class ItStaticNodeFinderTest extends ClusterPerClassIntegrationTest {
+class ItStaticNodeFinderTest extends ClusterPerTestIntegrationTest {
     @Override
     protected int initialNodes() {
-        return 1;
-    }
-
-    @Override
-    protected String getNodeBootstrapConfigTemplate() {
-        return "ignite {\n"
-                + "  network: {\n"
-                + "    nodeFinder.netClusterNodes: [ \"bad.host:1234\" ]\n"
-                + "  },\n"
-                + "}";
-    }
-
-    @Override
-    protected boolean needInitializeCluster() {
-        return false;
+        return 0;
     }
 
+    /** Tests that node finder failure causes node shutdown. */
     @Test
     void testNodeShutdownOnNodeFinderFailure(TestInfo testInfo) {
         Throwable throwable = assertThrowsWithCause(
-                () -> CLUSTER.startAndInit(testInfo, initialNodes(), 
cmgMetastoreNodes(), this::configureInitParameters),
-                IgniteInternalException.class);
+                () -> startEmbeddedNode(
+                        testInfo,
+                        0,
+                        config -> ConfigDocumentFactory.parseString(config)
+                                
.withValueText("ignite.network.nodeFinder.netClusterNodes", "[ 
\"bad.host:1234\" ]")
+                                
.withValueText("ignite.network.nodeFinder.nameResolutionAttempts", "1")
+                                .render()
+                ),
+                IgniteInternalException.class
+        );
 
         IgniteInternalException actual = (IgniteInternalException) 
unwrapRootCause(throwable);
         assertEquals(ADDRESS_UNRESOLVED_ERR, actual.code());
         assertEquals("No network addresses resolved through any provided 
names", actual.getMessage());
     }
+
+    /**
+     * Verifies a situation when two nodes are started simultaneously, but one 
of them is stuck trying to resolve host names. We then
+     * check that no network threads are blocked while name resolution is in 
progress.
+     */
+    @Test
+    void testNameResolutionDoesNotBlockNetworkThreads(TestInfo testInfo) {
+        LogInspector watchdogLogInspector = 
LogInspector.create(FailureManager.class, true);
+
+        var blockedThreadsCounter = new AtomicInteger();
+
+        watchdogLogInspector.addHandler(
+                event -> {
+                    Throwable thrown = event.getThrown();
+
+                    return thrown != null && thrown.getMessage().contains("A 
critical thread is blocked");
+                },
+                blockedThreadsCounter::incrementAndGet
+        );
+
+        try {
+            // First, start the node that will get stuck trying to resolve 
host names.
+            CompletableFuture<Void> startBrokenNodeFuture = runAsync(() -> 
startEmbeddedNode(
+                    testInfo,
+                    0,
+                    config -> ConfigDocumentFactory.parseString(config)
+                            .withValueText("ignite.network.port", 
String.valueOf(DEFAULT_BASE_PORT))
+                            
.withValueText("ignite.network.nodeFinder.netClusterNodes", "[ 
\"bad.host:1234\" ]")
+                            
.withValueText("ignite.network.nodeFinder.nameResolutionAttempts", "3")
+                            .render()
+            ));
+
+            // Start a second node that will try to open a connection to the 
first node. It should start successfully.
+            startEmbeddedNode(
+                    testInfo,
+                    1,
+                    config -> ConfigDocumentFactory.parseString(config)
+                            .withValueText("ignite.network.port", 
String.valueOf(DEFAULT_BASE_PORT + 1))
+                            .withValueText(
+                                    
"ignite.network.nodeFinder.netClusterNodes",
+                                    String.format("[ \"localhost:%d\" ]", 
DEFAULT_BASE_PORT)
+                            )
+                            .render()
+            );
+
+            // The first node should fail to start after host name resolution 
timeout.
+            assertThat(
+                    startBrokenNodeFuture,
+                    
willThrowWithCauseOrSuppressed(IgniteInternalException.class,
+                            "No network addresses resolved through any 
provided names")
+            );
+        } finally {
+            watchdogLogInspector.stop();
+        }
+
+        // Verify that no critical threads were blocked during the test.
+        assertThat(blockedThreadsCounter.get(), is(0));
+    }
+
+    private void startEmbeddedNode(TestInfo testInfo, int nodeIndex, 
NodeBootstrapConfigUpdater nodeBootstrapConfigUpdater) {
+        cluster.startEmbeddedNode(testInfo, nodeIndex, 
getNodeBootstrapConfigTemplate(), nodeBootstrapConfigUpdater);
+    }
 }
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index c75a9600e81..4f5a5a51965 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -325,7 +325,7 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
         IgniteInternalException exception = (IgniteInternalException) 
assertThrows(
                 IgniteInternalException.class,
                 () -> startManager(4000),
-                "Failed to start the connection manager: Cannot start server 
at address=, port=4000"
+                "Failed to start the connection manager: Cannot start server 
at address=0.0.0.0, port=4000"
         );
 
         assertEquals("IGN-NETWORK-2", exception.codeAsString());
@@ -557,11 +557,19 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
         assertThat(bootstrapFactory.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
         try {
+            var localBindAddress = new InetSocketAddress(port);
+
+            var localNode = new ClusterNodeImpl(
+                    launchId,
+                    consistentId,
+                    new NetworkAddress(localBindAddress.getHostName(), port)
+            );
+
             var manager = new ConnectionManager(
                     cfg,
                     new SerializationService(registry, 
mock(UserObjectSerializationContext.class)),
-                    consistentId,
-                    launchId,
+                    localBindAddress,
+                    localNode,
                     bootstrapFactory,
                     new AllIdsAreFresh(),
                     withoutClusterId(),
@@ -572,11 +580,6 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
             );
 
             manager.start();
-            manager.setLocalNode(new ClusterNodeImpl(
-                    launchId,
-                    consistentId,
-                    new 
NetworkAddress(manager.localBindAddress().getHostName(), port)
-            ));
 
             var wrapper = new ConnectionManagerWrapper(manager, 
bootstrapFactory, launchId);
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 6b19c8057e1..c54f6396128 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -34,7 +34,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,7 +42,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.failure.FailureProcessor;
@@ -64,7 +62,6 @@ import 
org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
 import org.apache.ignite.internal.network.configuration.SslView;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
-import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
 import 
org.apache.ignite.internal.network.recovery.RecoveryAcceptorHandshakeManager;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
@@ -112,14 +109,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
     /** Message listeners. */
     private final List<Consumer<InNetworkObject>> listeners = new 
CopyOnWriteArrayList<>();
 
-    /** Node ephemeral ID. */
-    private final UUID nodeId;
-
-    /**
-     * Completed when local node is set; attempts to initiate a connection to 
this node from the outside will wait
-     * till it's completed.
-     */
-    private final CompletableFuture<InternalClusterNode> localNodeFuture = new 
CompletableFuture<>();
+    private final InternalClusterNode localNode;
 
     protected final NettyBootstrapFactory bootstrapFactory;
 
@@ -155,26 +145,12 @@ public class ConnectionManager implements 
ChannelCreationListener {
     /** Failure processor. */
     protected final FailureProcessor failureProcessor;
 
-    /**
-     * Constructor.
-     *
-     * @param networkConfiguration Network configuration.
-     * @param serializationService Serialization service.
-     * @param nodeName Node name.
-     * @param nodeId ID of this node.
-     * @param bootstrapFactory Bootstrap factory.
-     * @param staleIdDetector Detects stale member IDs.
-     * @param clusterIdSupplier Supplier of cluster ID.
-     * @param channelTypeRegistry {@link ChannelType} registry.
-     * @param productVersionSource Source of product version.
-     * @param topologyService Cluster topology service.
-     * @param failureProcessor Failure processor.
-     */
+    /** Constructor. */
     public ConnectionManager(
             NetworkView networkConfiguration,
             SerializationService serializationService,
-            String nodeName,
-            UUID nodeId,
+            InetSocketAddress localBindAddress,
+            InternalClusterNode localNode,
             NettyBootstrapFactory bootstrapFactory,
             StaleIdDetector staleIdDetector,
             ClusterIdSupplier clusterIdSupplier,
@@ -184,7 +160,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
             FailureProcessor failureProcessor
     ) {
         this.serializationService = serializationService;
-        this.nodeId = nodeId;
+        this.localNode = localNode;
         this.bootstrapFactory = bootstrapFactory;
         this.staleIdDetector = staleIdDetector;
         this.clusterIdSupplier = clusterIdSupplier;
@@ -198,8 +174,8 @@ public class ConnectionManager implements 
ChannelCreationListener {
         clientSslContext = ssl.enabled() ? 
SslContextProvider.createClientSslContext(ssl) : null;
 
         this.server = new NettyServer(
-                networkConfiguration,
-                this::createAcceptorHandshakeManager,
+                localBindAddress,
+                this::newRecoveryAcceptorHandshakeManager,
                 this::onMessage,
                 serializationService,
                 bootstrapFactory,
@@ -216,7 +192,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
                 1,
                 SECONDS,
                 new LinkedBlockingQueue<>(),
-                IgniteThreadFactory.create(nodeName, "connection-maintenance", 
LOG)
+                IgniteThreadFactory.create(localNode.name(), 
"connection-maintenance", LOG)
         );
         maintenanceExecutor.allowCoreThreadTimeOut(true);
 
@@ -420,7 +396,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
         var client = new NettyClient(
                 address,
                 serializationService,
-                createInitiatorHandshakeManager(channelType.id()),
+                newRecoveryInitiatorHandshakeManager(channelType.id(), 
localNode),
                 this::onMessage,
                 clientSslContext
         );
@@ -493,12 +469,6 @@ public class ConnectionManager implements 
ChannelCreationListener {
         return stopped.get();
     }
 
-    private HandshakeManager createInitiatorHandshakeManager(short 
connectionId) {
-        InternalClusterNode localNode = 
Objects.requireNonNull(localNodeFuture.getNow(null), "localNode not set");
-
-        return newRecoveryInitiatorHandshakeManager(connectionId, localNode);
-    }
-
     /**
      * Factory method for overriding the handshake manager implementation in 
subclasses.
      */
@@ -521,14 +491,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
         );
     }
 
-    private HandshakeManager createAcceptorHandshakeManager() {
-        // Do not just use localNodeFuture.join() to make sure the wait is 
time-limited.
-        InternalClusterNode localNode = waitForLocalNodeToBeSet();
-
-        return newRecoveryAcceptorHandshakeManager(localNode);
-    }
-
-    private RecoveryAcceptorHandshakeManager 
newRecoveryAcceptorHandshakeManager(InternalClusterNode localNode) {
+    private RecoveryAcceptorHandshakeManager 
newRecoveryAcceptorHandshakeManager() {
         return new RecoveryAcceptorHandshakeManager(
                 localNode,
                 FACTORY,
@@ -544,18 +507,6 @@ public class ConnectionManager implements 
ChannelCreationListener {
         );
     }
 
-    private InternalClusterNode waitForLocalNodeToBeSet() {
-        try {
-            return localNodeFuture.get(10, SECONDS);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new RuntimeException("Interrupted while waiting for local 
node to be set", e);
-        } catch (ExecutionException | TimeoutException e) {
-            throw new RuntimeException("Could not finish awaiting for local 
node", e);
-        }
-    }
-
     /**
      * Returns connection manager's {@link #server}.
      *
@@ -577,7 +528,7 @@ public class ConnectionManager implements 
ChannelCreationListener {
      * @return This node's id.
      */
     public UUID nodeId() {
-        return nodeId;
+        return localNode.id();
     }
 
     /**
@@ -691,11 +642,4 @@ public class ConnectionManager implements 
ChannelCreationListener {
 
         return nullCompletedFuture();
     }
-
-    /**
-     * Sets the local node. Only after this this manager becomes able to 
accept incoming connections.
-     */
-    public void setLocalNode(InternalClusterNode localNode) {
-        localNodeFuture.complete(localNode);
-    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 624b72f685d..a696738459e 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -44,7 +45,6 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
-import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -64,8 +64,8 @@ public class NettyServer {
     /** Bootstrap factory. */
     private final NettyBootstrapFactory bootstrapFactory;
 
-    /** Server socket configuration. */
-    private final NetworkView configuration;
+    /** Server socket address. */
+    private final InetSocketAddress bindAddress;
 
     /** Serialization service. */
     private final SerializationService serializationService;
@@ -99,7 +99,7 @@ public class NettyServer {
     /**
      * Constructor.
      *
-     * @param configuration Server configuration.
+     * @param bindAddress Server socket address.
      * @param handshakeManager Handshake manager supplier.
      * @param messageListener Message listener.
      * @param serializationService Serialization service.
@@ -107,14 +107,14 @@ public class NettyServer {
      * @param sslContext Server SSL context, {@code null} if SSL is not {@link 
SslConfigurationSchema#enabled}.
      */
     public NettyServer(
-            NetworkView configuration,
+            InetSocketAddress bindAddress,
             Supplier<HandshakeManager> handshakeManager,
             Consumer<InNetworkObject> messageListener,
             SerializationService serializationService,
             NettyBootstrapFactory bootstrapFactory,
             @Nullable SslContext sslContext
     ) {
-        this.configuration = configuration;
+        this.bindAddress = bindAddress;
         this.handshakeManager = handshakeManager;
         this.messageListener = messageListener;
         this.serializationService = serializationService;
@@ -157,31 +157,19 @@ public class NettyServer {
                 }
             });
 
-            int port = configuration.port();
-            String[] addresses = configuration.listenAddresses();
-
             var bindFuture = new CompletableFuture<Channel>();
 
-            ChannelFuture channelFuture;
-            if (addresses.length == 0) {
-                channelFuture = bootstrap.bind(port);
-            } else {
-                if (addresses.length > 1) {
-                    // TODO: IGNITE-22369 - support more than one listen 
address.
-                    throw new IgniteException(INTERNAL_ERR, "Only one listen 
address is allowed for now, but got " + List.of(addresses));
-                }
-
-                channelFuture = bootstrap.bind(addresses[0], port);
-            }
-
-            channelFuture.addListener((ChannelFuture future) -> {
+            bootstrap.bind(bindAddress).addListener((ChannelFuture future) -> {
                 if (future.isSuccess()) {
                     bindFuture.complete(future.channel());
                 } else if (future.isCancelled()) {
                     bindFuture.cancel(true);
                 } else {
-                    String address = addresses.length == 0 ? "" : addresses[0];
-                    String errorMessage = "Cannot start server at address=" + 
address + ", port=" + port;
+                    String errorMessage = String.format(
+                            "Cannot start server at address=%s, port=%d",
+                            bindAddress.getHostString(), bindAddress.getPort()
+                    );
+
                     bindFuture.completeExceptionally(new 
IgniteException(BIND_ERR, errorMessage, future.cause()));
                 }
             });
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index c044f3c095b..4b7791f13c0 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -20,19 +20,19 @@ package org.apache.ignite.internal.network.scalecube;
 import static io.scalecube.cluster.membership.MembershipEvent.createAdded;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import io.scalecube.cluster.ClusterConfig;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
-import io.scalecube.cluster.Member;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.metadata.MetadataCodec;
 import io.scalecube.net.Address;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -70,6 +70,7 @@ import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationC
 import 
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
 import org.apache.ignite.internal.version.IgniteProductVersionSource;
 import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeMetadata;
 
@@ -149,15 +150,23 @@ public class ScaleCubeClusterServiceFactory {
             public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 var serializationService = new 
SerializationService(serializationRegistry, userObjectSerialization);
 
-                UUID launchId = UUID.randomUUID();
-
                 NetworkView configView = networkConfiguration.value();
 
+                InetSocketAddress localBindAddress = 
localBindAddress(configView);
+
+                Address scalecubeLocalAddress = 
prepareAddress(localBindAddress);
+
+                var localNode = new ClusterNodeImpl(
+                        UUID.randomUUID(),
+                        consistentId,
+                        new NetworkAddress(scalecubeLocalAddress.host(), 
scalecubeLocalAddress.port())
+                );
+
                 ConnectionManager connectionMgr = new ConnectionManager(
                         configView,
                         serializationService,
-                        consistentId,
-                        launchId,
+                        localBindAddress,
+                        localNode,
                         nettyBootstrapFactory,
                         staleIds,
                         clusterIdSupplier,
@@ -171,8 +180,6 @@ public class ScaleCubeClusterServiceFactory {
                 connectionMgr.start();
                 messagingService.start();
 
-                Address scalecubeLocalAddress = 
prepareAddress(connectionMgr.localBindAddress());
-
                 topologyService.addEventHandler(new TopologyEventHandler() {
                     @Override
                     public void onDisappeared(InternalClusterNode member) {
@@ -187,37 +194,23 @@ public class ScaleCubeClusterServiceFactory {
                         messageFactory
                 );
 
-                ClusterConfig clusterConfig = 
clusterConfig(configView.membership());
-
-                NodeFinder finder = NodeFinderFactory.createNodeFinder(
-                        configView.nodeFinder(),
-                        nodeName(),
-                        connectionMgr.localBindAddress()
-                );
+                NodeFinder finder = 
NodeFinderFactory.createNodeFinder(configView.nodeFinder(), nodeName(), 
localBindAddress);
                 finder.start();
 
+                ClusterConfig clusterConfig = 
clusterConfig(configView.membership())
+                        .memberId(localNode.id().toString())
+                        .memberAlias(localNode.name())
+                        .transport(opts -> 
opts.transportFactory(transportConfig -> transport))
+                        .membership(opts -> 
opts.seedMembers(parseAddresses(finder.findNodes())))
+                        .metadataCodec(METADATA_CODEC);
+
                 ClusterImpl cluster = new ClusterImpl(clusterConfig)
                         .handler(cl -> new ClusterMessageHandler() {
                             @Override
                             public void onMembershipEvent(MembershipEvent 
event) {
                                 topologyService.onMembershipEvent(event);
                             }
-                        })
-                        .config(opts -> opts
-                                .memberId(launchId.toString())
-                                .memberAlias(consistentId)
-                                .metadataCodec(METADATA_CODEC)
-                        )
-                        .transport(opts -> 
opts.transportFactory(transportConfig -> transport))
-                        .membership(opts -> 
opts.seedMembers(parseAddresses(finder.findNodes())));
-
-                Member localMember = createLocalMember(scalecubeLocalAddress, 
launchId, clusterConfig);
-                InternalClusterNode localNode = new ClusterNodeImpl(
-                        UUID.fromString(localMember.id()),
-                        consistentId,
-                        new NetworkAddress(localMember.address().host(), 
localMember.address().port())
-                );
-                connectionMgr.setLocalNode(localNode);
+                        });
 
                 this.shutdownFuture = cluster.onShutdown().toFuture()
                         .thenAccept(v -> finder.close());
@@ -228,11 +221,8 @@ public class ScaleCubeClusterServiceFactory {
 
                 cluster.startAwait();
 
-                assert cluster.member().equals(localMember) : "Expected local 
member from cluster " + cluster.member()
-                        + " to be equal to the precomputed one " + localMember;
-
                 // emit an artificial event as if the local member has joined 
the topology (ScaleCube doesn't do that)
-                var localMembershipEvent = createAdded(cluster.member(), null, 
System.currentTimeMillis());
+                MembershipEvent localMembershipEvent = 
createAdded(cluster.member(), null, System.currentTimeMillis());
                 topologyService.onMembershipEvent(localMembershipEvent);
 
                 this.cluster = cluster;
@@ -240,6 +230,25 @@ public class ScaleCubeClusterServiceFactory {
                 return nullCompletedFuture();
             }
 
+            private InetSocketAddress localBindAddress(NetworkView configView) 
{
+                int port = configView.port();
+
+                String[] addresses = configView.listenAddresses();
+
+                if (addresses.length == 0) {
+                    return new InetSocketAddress(port);
+                } else {
+                    if (addresses.length > 1) {
+                        // TODO: IGNITE-22369 - support more than one listen 
address.
+                        throw new IgniteException(
+                                INTERNAL_ERR, "Only one listen address is 
allowed for now, but got " + Arrays.toString(addresses)
+                        );
+                    }
+
+                    return new InetSocketAddress(addresses[0], port);
+                }
+            }
+
             @Override
             public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
                 try {
@@ -315,23 +324,6 @@ public class ScaleCubeClusterServiceFactory {
         return Address.create(host, addr.getPort());
     }
 
-    // This is copied from ClusterImpl#creeateLocalMember() and adjusted to 
always use launchId as member ID.
-    private Member createLocalMember(Address address, UUID launchId, 
ClusterConfig config) {
-        int port = 
Optional.ofNullable(config.externalPort()).orElse(address.port());
-
-        // calculate local member cluster address
-        Address memberAddress =
-                Optional.ofNullable(config.externalHost())
-                        .map(host -> Address.create(host, port))
-                        .orElseGet(() -> Address.create(address.host(), port));
-
-        return new Member(
-                launchId.toString(),
-                config.memberAlias(),
-                memberAddress,
-                config.membershipConfig().namespace());
-    }
-
     /**
      * Returns ScaleCube's cluster configuration. Can be overridden in 
subclasses for finer control of the created {@link ClusterService}
      * instances.
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 11d7ad11b80..ce2c13a4497 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.time.Instant;
 import java.util.List;
@@ -633,7 +634,6 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
                 beforeHandshake
         );
         connectionManager.start();
-        connectionManager.setLocalNode(node);
 
         messagingService.setConnectionManager(connectionManager);
 
@@ -657,8 +657,8 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
             super(
                     networkConfig.value(),
                     serializationService,
-                    node.name(),
-                    node.id(),
+                    new InetSocketAddress(node.address().host(), 
node.address().port()),
+                    node,
                     bootstrapFactory,
                     staleIdDetector,
                     clusterIdSupplier,
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index b71bd4c93fa..6e3beff664c 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -48,8 +48,10 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.ConnectException;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -59,6 +61,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.MessageDeserializer;
 import 
org.apache.ignite.internal.network.serialization.MessageMappingException;
@@ -227,7 +230,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest 
{
         assertThat(bootstrapFactory.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
         server = new NettyServer(
-                serverCfg.value(),
+                localBindAddress(serverCfg.value()),
                 () -> handshakeManager,
                 (message) -> {},
                 new SerializationService(registry, 
mock(UserObjectSerializationContext.class)),
@@ -324,7 +327,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest 
{
         MessageSerializationRegistry registry = 
mock(MessageSerializationRegistry.class);
 
         var server = new NettyServer(
-                serverCfg.value(),
+                localBindAddress(serverCfg.value()),
                 this::mockHandshakeManager,
                 (message) -> {},
                 new SerializationService(registry, 
mock(UserObjectSerializationContext.class)),
@@ -343,6 +346,17 @@ public class NettyServerTest extends 
BaseIgniteAbstractTest {
         return server;
     }
 
+    private static InetSocketAddress localBindAddress(NetworkView configView) {
+        int port = configView.port();
+
+        String[] addresses = configView.listenAddresses();
+
+        // TODO: IGNITE-22369 - support more than one listen address.
+        assert addresses.length <= 1 : "Only one listen address is allowed for 
now, but got " + Arrays.toString(addresses);
+
+        return addresses.length == 0 ? new InetSocketAddress(port) : new 
InetSocketAddress(addresses[0], port);
+    }
+
     /** Server channel on top of the {@link EmbeddedChannel}. */
     private static class EmbeddedServerChannel extends EmbeddedChannel 
implements ServerChannel {
         // No-op.


Reply via email to