This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new aa787089d7e IGNITE-27799 Remove blocking local node creation (#7569)
aa787089d7e is described below
commit aa787089d7ea4aa69b28e3740d008b6eda2d0c8b
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Wed Feb 11 10:00:10 2026 +0200
IGNITE-27799 Remove blocking local node creation (#7569)
---
modules/network/build.gradle | 1 +
.../internal/network/ItStaticNodeFinderTest.java | 109 ++++++++++++++++-----
.../network/netty/ItConnectionManagerTest.java | 19 ++--
.../internal/network/netty/ConnectionManager.java | 78 +++------------
.../ignite/internal/network/netty/NettyServer.java | 36 +++----
.../scalecube/ScaleCubeClusterServiceFactory.java | 96 +++++++++---------
.../network/DefaultMessagingServiceTest.java | 6 +-
.../internal/network/netty/NettyServerTest.java | 18 +++-
8 files changed, 185 insertions(+), 178 deletions(-)
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index c1be1499a2e..fbfe99977b9 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -92,6 +92,7 @@ dependencies {
integrationTestImplementation libs.netty.handler
integrationTestImplementation libs.scalecube.cluster
integrationTestImplementation libs.javapoet
+ integrationTestImplementation libs.typesafe.config
}
tasks.withType(Test).configureEach {
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
index 18267e7ad6f..79f6225f73d 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/ItStaticNodeFinderTest.java
@@ -17,47 +17,112 @@
package org.apache.ignite.internal.network;
+import static
org.apache.ignite.internal.ClusterConfiguration.DEFAULT_BASE_PORT;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import static
org.apache.ignite.lang.ErrorGroups.Network.ADDRESS_UNRESOLVED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import com.typesafe.config.parser.ConfigDocumentFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.NodeBootstrapConfigUpdater;
+import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
-/**
- * Tests that node finder failure causes node shutdown.
- */
-class ItStaticNodeFinderTest extends ClusterPerClassIntegrationTest {
+class ItStaticNodeFinderTest extends ClusterPerTestIntegrationTest {
@Override
protected int initialNodes() {
- return 1;
- }
-
- @Override
- protected String getNodeBootstrapConfigTemplate() {
- return "ignite {\n"
- + " network: {\n"
- + " nodeFinder.netClusterNodes: [ \"bad.host:1234\" ]\n"
- + " },\n"
- + "}";
- }
-
- @Override
- protected boolean needInitializeCluster() {
- return false;
+ return 0;
}
+ /** Tests that node finder failure causes node shutdown. */
@Test
void testNodeShutdownOnNodeFinderFailure(TestInfo testInfo) {
Throwable throwable = assertThrowsWithCause(
- () -> CLUSTER.startAndInit(testInfo, initialNodes(),
cmgMetastoreNodes(), this::configureInitParameters),
- IgniteInternalException.class);
+ () -> startEmbeddedNode(
+ testInfo,
+ 0,
+ config -> ConfigDocumentFactory.parseString(config)
+
.withValueText("ignite.network.nodeFinder.netClusterNodes", "[
\"bad.host:1234\" ]")
+
.withValueText("ignite.network.nodeFinder.nameResolutionAttempts", "1")
+ .render()
+ ),
+ IgniteInternalException.class
+ );
IgniteInternalException actual = (IgniteInternalException)
unwrapRootCause(throwable);
assertEquals(ADDRESS_UNRESOLVED_ERR, actual.code());
assertEquals("No network addresses resolved through any provided
names", actual.getMessage());
}
+
+ /**
+ * Verifies a situation when two nodes are started simultaneously, but one
of them is stuck trying to resolve host names. We then
+ * check that no network threads are blocked while name resolution is in
progress.
+ */
+ @Test
+ void testNameResolutionDoesNotBlockNetworkThreads(TestInfo testInfo) {
+ LogInspector watchdogLogInspector =
LogInspector.create(FailureManager.class, true);
+
+ var blockedThreadsCounter = new AtomicInteger();
+
+ watchdogLogInspector.addHandler(
+ event -> {
+ Throwable thrown = event.getThrown();
+
+ return thrown != null && thrown.getMessage().contains("A
critical thread is blocked");
+ },
+ blockedThreadsCounter::incrementAndGet
+ );
+
+ try {
+ // First, start the node that will get stuck trying to resolve
host names.
+ CompletableFuture<Void> startBrokenNodeFuture = runAsync(() ->
startEmbeddedNode(
+ testInfo,
+ 0,
+ config -> ConfigDocumentFactory.parseString(config)
+ .withValueText("ignite.network.port",
String.valueOf(DEFAULT_BASE_PORT))
+
.withValueText("ignite.network.nodeFinder.netClusterNodes", "[
\"bad.host:1234\" ]")
+
.withValueText("ignite.network.nodeFinder.nameResolutionAttempts", "3")
+ .render()
+ ));
+
+ // Start a second node that will try to open a connection to the
first node. It should start successfully.
+ startEmbeddedNode(
+ testInfo,
+ 1,
+ config -> ConfigDocumentFactory.parseString(config)
+ .withValueText("ignite.network.port",
String.valueOf(DEFAULT_BASE_PORT + 1))
+ .withValueText(
+
"ignite.network.nodeFinder.netClusterNodes",
+ String.format("[ \"localhost:%d\" ]",
DEFAULT_BASE_PORT)
+ )
+ .render()
+ );
+
+ // The first node should fail to start after host name resolution
timeout.
+ assertThat(
+ startBrokenNodeFuture,
+
willThrowWithCauseOrSuppressed(IgniteInternalException.class,
+ "No network addresses resolved through any
provided names")
+ );
+ } finally {
+ watchdogLogInspector.stop();
+ }
+
+ // Verify that no critical threads were blocked during the test.
+ assertThat(blockedThreadsCounter.get(), is(0));
+ }
+
+ private void startEmbeddedNode(TestInfo testInfo, int nodeIndex,
NodeBootstrapConfigUpdater nodeBootstrapConfigUpdater) {
+ cluster.startEmbeddedNode(testInfo, nodeIndex,
getNodeBootstrapConfigTemplate(), nodeBootstrapConfigUpdater);
+ }
}
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index c75a9600e81..4f5a5a51965 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -325,7 +325,7 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
IgniteInternalException exception = (IgniteInternalException)
assertThrows(
IgniteInternalException.class,
() -> startManager(4000),
- "Failed to start the connection manager: Cannot start server
at address=, port=4000"
+ "Failed to start the connection manager: Cannot start server
at address=0.0.0.0, port=4000"
);
assertEquals("IGN-NETWORK-2", exception.codeAsString());
@@ -557,11 +557,19 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
assertThat(bootstrapFactory.startAsync(new ComponentContext()),
willCompleteSuccessfully());
try {
+ var localBindAddress = new InetSocketAddress(port);
+
+ var localNode = new ClusterNodeImpl(
+ launchId,
+ consistentId,
+ new NetworkAddress(localBindAddress.getHostName(), port)
+ );
+
var manager = new ConnectionManager(
cfg,
new SerializationService(registry,
mock(UserObjectSerializationContext.class)),
- consistentId,
- launchId,
+ localBindAddress,
+ localNode,
bootstrapFactory,
new AllIdsAreFresh(),
withoutClusterId(),
@@ -572,11 +580,6 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
);
manager.start();
- manager.setLocalNode(new ClusterNodeImpl(
- launchId,
- consistentId,
- new
NetworkAddress(manager.localBindAddress().getHostName(), port)
- ));
var wrapper = new ConnectionManagerWrapper(manager,
bootstrapFactory, launchId);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 6b19c8057e1..c54f6396128 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -34,7 +34,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,7 +42,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -64,7 +62,6 @@ import
org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
import org.apache.ignite.internal.network.configuration.SslView;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
-import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.recovery.DescriptorAcquiry;
import
org.apache.ignite.internal.network.recovery.RecoveryAcceptorHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
@@ -112,14 +109,7 @@ public class ConnectionManager implements
ChannelCreationListener {
/** Message listeners. */
private final List<Consumer<InNetworkObject>> listeners = new
CopyOnWriteArrayList<>();
- /** Node ephemeral ID. */
- private final UUID nodeId;
-
- /**
- * Completed when local node is set; attempts to initiate a connection to
this node from the outside will wait
- * till it's completed.
- */
- private final CompletableFuture<InternalClusterNode> localNodeFuture = new
CompletableFuture<>();
+ private final InternalClusterNode localNode;
protected final NettyBootstrapFactory bootstrapFactory;
@@ -155,26 +145,12 @@ public class ConnectionManager implements
ChannelCreationListener {
/** Failure processor. */
protected final FailureProcessor failureProcessor;
- /**
- * Constructor.
- *
- * @param networkConfiguration Network configuration.
- * @param serializationService Serialization service.
- * @param nodeName Node name.
- * @param nodeId ID of this node.
- * @param bootstrapFactory Bootstrap factory.
- * @param staleIdDetector Detects stale member IDs.
- * @param clusterIdSupplier Supplier of cluster ID.
- * @param channelTypeRegistry {@link ChannelType} registry.
- * @param productVersionSource Source of product version.
- * @param topologyService Cluster topology service.
- * @param failureProcessor Failure processor.
- */
+ /** Constructor. */
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
- String nodeName,
- UUID nodeId,
+ InetSocketAddress localBindAddress,
+ InternalClusterNode localNode,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
@@ -184,7 +160,7 @@ public class ConnectionManager implements
ChannelCreationListener {
FailureProcessor failureProcessor
) {
this.serializationService = serializationService;
- this.nodeId = nodeId;
+ this.localNode = localNode;
this.bootstrapFactory = bootstrapFactory;
this.staleIdDetector = staleIdDetector;
this.clusterIdSupplier = clusterIdSupplier;
@@ -198,8 +174,8 @@ public class ConnectionManager implements
ChannelCreationListener {
clientSslContext = ssl.enabled() ?
SslContextProvider.createClientSslContext(ssl) : null;
this.server = new NettyServer(
- networkConfiguration,
- this::createAcceptorHandshakeManager,
+ localBindAddress,
+ this::newRecoveryAcceptorHandshakeManager,
this::onMessage,
serializationService,
bootstrapFactory,
@@ -216,7 +192,7 @@ public class ConnectionManager implements
ChannelCreationListener {
1,
SECONDS,
new LinkedBlockingQueue<>(),
- IgniteThreadFactory.create(nodeName, "connection-maintenance",
LOG)
+ IgniteThreadFactory.create(localNode.name(),
"connection-maintenance", LOG)
);
maintenanceExecutor.allowCoreThreadTimeOut(true);
@@ -420,7 +396,7 @@ public class ConnectionManager implements
ChannelCreationListener {
var client = new NettyClient(
address,
serializationService,
- createInitiatorHandshakeManager(channelType.id()),
+ newRecoveryInitiatorHandshakeManager(channelType.id(),
localNode),
this::onMessage,
clientSslContext
);
@@ -493,12 +469,6 @@ public class ConnectionManager implements
ChannelCreationListener {
return stopped.get();
}
- private HandshakeManager createInitiatorHandshakeManager(short
connectionId) {
- InternalClusterNode localNode =
Objects.requireNonNull(localNodeFuture.getNow(null), "localNode not set");
-
- return newRecoveryInitiatorHandshakeManager(connectionId, localNode);
- }
-
/**
* Factory method for overriding the handshake manager implementation in
subclasses.
*/
@@ -521,14 +491,7 @@ public class ConnectionManager implements
ChannelCreationListener {
);
}
- private HandshakeManager createAcceptorHandshakeManager() {
- // Do not just use localNodeFuture.join() to make sure the wait is
time-limited.
- InternalClusterNode localNode = waitForLocalNodeToBeSet();
-
- return newRecoveryAcceptorHandshakeManager(localNode);
- }
-
- private RecoveryAcceptorHandshakeManager
newRecoveryAcceptorHandshakeManager(InternalClusterNode localNode) {
+ private RecoveryAcceptorHandshakeManager
newRecoveryAcceptorHandshakeManager() {
return new RecoveryAcceptorHandshakeManager(
localNode,
FACTORY,
@@ -544,18 +507,6 @@ public class ConnectionManager implements
ChannelCreationListener {
);
}
- private InternalClusterNode waitForLocalNodeToBeSet() {
- try {
- return localNodeFuture.get(10, SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new RuntimeException("Interrupted while waiting for local
node to be set", e);
- } catch (ExecutionException | TimeoutException e) {
- throw new RuntimeException("Could not finish awaiting for local
node", e);
- }
- }
-
/**
* Returns connection manager's {@link #server}.
*
@@ -577,7 +528,7 @@ public class ConnectionManager implements
ChannelCreationListener {
* @return This node's id.
*/
public UUID nodeId() {
- return nodeId;
+ return localNode.id();
}
/**
@@ -691,11 +642,4 @@ public class ConnectionManager implements
ChannelCreationListener {
return nullCompletedFuture();
}
-
- /**
- * Sets the local node. Only after this this manager becomes able to
accept incoming connections.
- */
- public void setLocalNode(InternalClusterNode localNode) {
- localNodeFuture.complete(localNode);
- }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 624b72f685d..a696738459e 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -31,6 +31,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
@@ -44,7 +45,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
-import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -64,8 +64,8 @@ public class NettyServer {
/** Bootstrap factory. */
private final NettyBootstrapFactory bootstrapFactory;
- /** Server socket configuration. */
- private final NetworkView configuration;
+ /** Server socket address. */
+ private final InetSocketAddress bindAddress;
/** Serialization service. */
private final SerializationService serializationService;
@@ -99,7 +99,7 @@ public class NettyServer {
/**
* Constructor.
*
- * @param configuration Server configuration.
+ * @param bindAddress Server socket address.
* @param handshakeManager Handshake manager supplier.
* @param messageListener Message listener.
* @param serializationService Serialization service.
@@ -107,14 +107,14 @@ public class NettyServer {
* @param sslContext Server SSL context, {@code null} if SSL is not {@link
SslConfigurationSchema#enabled}.
*/
public NettyServer(
- NetworkView configuration,
+ InetSocketAddress bindAddress,
Supplier<HandshakeManager> handshakeManager,
Consumer<InNetworkObject> messageListener,
SerializationService serializationService,
NettyBootstrapFactory bootstrapFactory,
@Nullable SslContext sslContext
) {
- this.configuration = configuration;
+ this.bindAddress = bindAddress;
this.handshakeManager = handshakeManager;
this.messageListener = messageListener;
this.serializationService = serializationService;
@@ -157,31 +157,19 @@ public class NettyServer {
}
});
- int port = configuration.port();
- String[] addresses = configuration.listenAddresses();
-
var bindFuture = new CompletableFuture<Channel>();
- ChannelFuture channelFuture;
- if (addresses.length == 0) {
- channelFuture = bootstrap.bind(port);
- } else {
- if (addresses.length > 1) {
- // TODO: IGNITE-22369 - support more than one listen
address.
- throw new IgniteException(INTERNAL_ERR, "Only one listen
address is allowed for now, but got " + List.of(addresses));
- }
-
- channelFuture = bootstrap.bind(addresses[0], port);
- }
-
- channelFuture.addListener((ChannelFuture future) -> {
+ bootstrap.bind(bindAddress).addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
bindFuture.complete(future.channel());
} else if (future.isCancelled()) {
bindFuture.cancel(true);
} else {
- String address = addresses.length == 0 ? "" : addresses[0];
- String errorMessage = "Cannot start server at address=" +
address + ", port=" + port;
+ String errorMessage = String.format(
+ "Cannot start server at address=%s, port=%d",
+ bindAddress.getHostString(), bindAddress.getPort()
+ );
+
bindFuture.completeExceptionally(new
IgniteException(BIND_ERR, errorMessage, future.cause()));
}
});
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index c044f3c095b..4b7791f13c0 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -20,19 +20,19 @@ package org.apache.ignite.internal.network.scalecube;
import static io.scalecube.cluster.membership.MembershipEvent.createAdded;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
-import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.net.Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -70,6 +70,7 @@ import
org.apache.ignite.internal.network.serialization.UserObjectSerializationC
import
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.internal.version.IgniteProductVersionSource;
import org.apache.ignite.internal.worker.CriticalWorkerRegistry;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeMetadata;
@@ -149,15 +150,23 @@ public class ScaleCubeClusterServiceFactory {
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
var serializationService = new
SerializationService(serializationRegistry, userObjectSerialization);
- UUID launchId = UUID.randomUUID();
-
NetworkView configView = networkConfiguration.value();
+ InetSocketAddress localBindAddress =
localBindAddress(configView);
+
+ Address scalecubeLocalAddress =
prepareAddress(localBindAddress);
+
+ var localNode = new ClusterNodeImpl(
+ UUID.randomUUID(),
+ consistentId,
+ new NetworkAddress(scalecubeLocalAddress.host(),
scalecubeLocalAddress.port())
+ );
+
ConnectionManager connectionMgr = new ConnectionManager(
configView,
serializationService,
- consistentId,
- launchId,
+ localBindAddress,
+ localNode,
nettyBootstrapFactory,
staleIds,
clusterIdSupplier,
@@ -171,8 +180,6 @@ public class ScaleCubeClusterServiceFactory {
connectionMgr.start();
messagingService.start();
- Address scalecubeLocalAddress =
prepareAddress(connectionMgr.localBindAddress());
-
topologyService.addEventHandler(new TopologyEventHandler() {
@Override
public void onDisappeared(InternalClusterNode member) {
@@ -187,37 +194,23 @@ public class ScaleCubeClusterServiceFactory {
messageFactory
);
- ClusterConfig clusterConfig =
clusterConfig(configView.membership());
-
- NodeFinder finder = NodeFinderFactory.createNodeFinder(
- configView.nodeFinder(),
- nodeName(),
- connectionMgr.localBindAddress()
- );
+ NodeFinder finder =
NodeFinderFactory.createNodeFinder(configView.nodeFinder(), nodeName(),
localBindAddress);
finder.start();
+ ClusterConfig clusterConfig =
clusterConfig(configView.membership())
+ .memberId(localNode.id().toString())
+ .memberAlias(localNode.name())
+ .transport(opts ->
opts.transportFactory(transportConfig -> transport))
+ .membership(opts ->
opts.seedMembers(parseAddresses(finder.findNodes())))
+ .metadataCodec(METADATA_CODEC);
+
ClusterImpl cluster = new ClusterImpl(clusterConfig)
.handler(cl -> new ClusterMessageHandler() {
@Override
public void onMembershipEvent(MembershipEvent
event) {
topologyService.onMembershipEvent(event);
}
- })
- .config(opts -> opts
- .memberId(launchId.toString())
- .memberAlias(consistentId)
- .metadataCodec(METADATA_CODEC)
- )
- .transport(opts ->
opts.transportFactory(transportConfig -> transport))
- .membership(opts ->
opts.seedMembers(parseAddresses(finder.findNodes())));
-
- Member localMember = createLocalMember(scalecubeLocalAddress,
launchId, clusterConfig);
- InternalClusterNode localNode = new ClusterNodeImpl(
- UUID.fromString(localMember.id()),
- consistentId,
- new NetworkAddress(localMember.address().host(),
localMember.address().port())
- );
- connectionMgr.setLocalNode(localNode);
+ });
this.shutdownFuture = cluster.onShutdown().toFuture()
.thenAccept(v -> finder.close());
@@ -228,11 +221,8 @@ public class ScaleCubeClusterServiceFactory {
cluster.startAwait();
- assert cluster.member().equals(localMember) : "Expected local
member from cluster " + cluster.member()
- + " to be equal to the precomputed one " + localMember;
-
// emit an artificial event as if the local member has joined
the topology (ScaleCube doesn't do that)
- var localMembershipEvent = createAdded(cluster.member(), null,
System.currentTimeMillis());
+ MembershipEvent localMembershipEvent =
createAdded(cluster.member(), null, System.currentTimeMillis());
topologyService.onMembershipEvent(localMembershipEvent);
this.cluster = cluster;
@@ -240,6 +230,25 @@ public class ScaleCubeClusterServiceFactory {
return nullCompletedFuture();
}
+ private InetSocketAddress localBindAddress(NetworkView configView)
{
+ int port = configView.port();
+
+ String[] addresses = configView.listenAddresses();
+
+ if (addresses.length == 0) {
+ return new InetSocketAddress(port);
+ } else {
+ if (addresses.length > 1) {
+ // TODO: IGNITE-22369 - support more than one listen
address.
+ throw new IgniteException(
+ INTERNAL_ERR, "Only one listen address is
allowed for now, but got " + Arrays.toString(addresses)
+ );
+ }
+
+ return new InetSocketAddress(addresses[0], port);
+ }
+ }
+
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
try {
@@ -315,23 +324,6 @@ public class ScaleCubeClusterServiceFactory {
return Address.create(host, addr.getPort());
}
- // This is copied from ClusterImpl#creeateLocalMember() and adjusted to
always use launchId as member ID.
- private Member createLocalMember(Address address, UUID launchId,
ClusterConfig config) {
- int port =
Optional.ofNullable(config.externalPort()).orElse(address.port());
-
- // calculate local member cluster address
- Address memberAddress =
- Optional.ofNullable(config.externalHost())
- .map(host -> Address.create(host, port))
- .orElseGet(() -> Address.create(address.host(), port));
-
- return new Member(
- launchId.toString(),
- config.memberAlias(),
- memberAddress,
- config.membershipConfig().namespace());
- }
-
/**
* Returns ScaleCube's cluster configuration. Can be overridden in
subclasses for finer control of the created {@link ClusterService}
* instances.
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 11d7ad11b80..ce2c13a4497 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.List;
@@ -633,7 +634,6 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
beforeHandshake
);
connectionManager.start();
- connectionManager.setLocalNode(node);
messagingService.setConnectionManager(connectionManager);
@@ -657,8 +657,8 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
super(
networkConfig.value(),
serializationService,
- node.name(),
- node.id(),
+ new InetSocketAddress(node.address().host(),
node.address().port()),
+ node,
bootstrapFactory,
staleIdDetector,
clusterIdSupplier,
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index b71bd4c93fa..6e3beff664c 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -48,8 +48,10 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ConnectException;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -59,6 +61,7 @@ import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.MessageDeserializer;
import
org.apache.ignite.internal.network.serialization.MessageMappingException;
@@ -227,7 +230,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest
{
assertThat(bootstrapFactory.startAsync(new ComponentContext()),
willCompleteSuccessfully());
server = new NettyServer(
- serverCfg.value(),
+ localBindAddress(serverCfg.value()),
() -> handshakeManager,
(message) -> {},
new SerializationService(registry,
mock(UserObjectSerializationContext.class)),
@@ -324,7 +327,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest
{
MessageSerializationRegistry registry =
mock(MessageSerializationRegistry.class);
var server = new NettyServer(
- serverCfg.value(),
+ localBindAddress(serverCfg.value()),
this::mockHandshakeManager,
(message) -> {},
new SerializationService(registry,
mock(UserObjectSerializationContext.class)),
@@ -343,6 +346,17 @@ public class NettyServerTest extends
BaseIgniteAbstractTest {
return server;
}
+ private static InetSocketAddress localBindAddress(NetworkView configView) {
+ int port = configView.port();
+
+ String[] addresses = configView.listenAddresses();
+
+ // TODO: IGNITE-22369 - support more than one listen address.
+ assert addresses.length <= 1 : "Only one listen address is allowed for
now, but got " + Arrays.toString(addresses);
+
+ return addresses.length == 0 ? new InetSocketAddress(port) : new
InetSocketAddress(addresses[0], port);
+ }
+
/** Server channel on top of the {@link EmbeddedChannel}. */
private static class EmbeddedServerChannel extends EmbeddedChannel
implements ServerChannel {
// No-op.