This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fedaaa260d6 IGNITE-27290 Add new components to handshake managers
(#7191)
fedaaa260d6 is described below
commit fedaaa260d63fd74712960f51abb2317deb08c28
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Dec 11 11:40:45 2025 +0300
IGNITE-27290 Add new components to handshake managers (#7191)
---
modules/network/build.gradle | 2 +
.../network/netty/ItConnectionManagerTest.java | 6 +-
.../internal/network/netty/ConnectionManager.java | 122 +++++++++------------
.../recovery/RecoveryAcceptorHandshakeManager.java | 10 +-
.../RecoveryInitiatorHandshakeManager.java | 16 ++-
.../RecoveryInitiatorHandshakeManagerFactory.java | 40 -------
.../scalecube/ScaleCubeClusterServiceFactory.java | 4 +-
.../network/DefaultMessagingServiceTest.java | 100 ++++++++++-------
.../network/netty/RecoveryHandshakeTest.java | 12 +-
.../RecoveryAcceptorHandshakeManagerTest.java | 7 +-
.../RecoveryInitiatorHandshakeManagerTest.java | 9 +-
11 files changed, 167 insertions(+), 161 deletions(-)
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index f96afa97f71..6ab6851685d 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -54,6 +54,7 @@ dependencies {
testImplementation project(':ignite-network-annotation-processor')
testImplementation testFixtures(project(':ignite-configuration'))
testImplementation testFixtures(project(':ignite-core'))
+ testImplementation testFixtures(project(':ignite-failure-handler'))
testImplementation libs.jmh.core
testImplementation(libs.kryo) {
//IDEA test runner don't apply Gradle dependency resolve strategy,
this is just not implemented
@@ -83,6 +84,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
+ integrationTestImplementation
testFixtures(project(':ignite-failure-handler:'))
integrationTestImplementation libs.compileTesting
integrationTestImplementation libs.netty.handler
integrationTestImplementation libs.scalecube.cluster
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 82961fea365..c75a9600e81 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.ComponentContext;
@@ -69,6 +70,7 @@ import
org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.handshake.HandshakeException;
@@ -564,7 +566,9 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
new AllIdsAreFresh(),
withoutClusterId(),
defaultChannelTypeRegistry(),
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ mock(TopologyService.class),
+ new NoOpFailureManager()
);
manager.start();
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 37b3ceb068c..55ce2153bdd 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
import org.apache.ignite.internal.network.configuration.SslView;
@@ -68,7 +70,6 @@ import
org.apache.ignite.internal.network.recovery.RecoveryAcceptorHandshakeMana
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
-import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.ssl.SslContextProvider;
@@ -83,7 +84,7 @@ import org.jetbrains.annotations.TestOnly;
*/
public class ConnectionManager implements ChannelCreationListener {
/** Message factory. */
- private static final NetworkMessagesFactory FACTORY = new
NetworkMessagesFactory();
+ protected static final NetworkMessagesFactory FACTORY = new
NetworkMessagesFactory();
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ConnectionManager.class);
@@ -120,74 +121,39 @@ public class ConnectionManager implements
ChannelCreationListener {
*/
private final CompletableFuture<InternalClusterNode> localNodeFuture = new
CompletableFuture<>();
- private final NettyBootstrapFactory bootstrapFactory;
+ protected final NettyBootstrapFactory bootstrapFactory;
/** Used to detect that a peer uses a stale ID. */
- private final StaleIdDetector staleIdDetector;
+ protected final StaleIdDetector staleIdDetector;
- private final ClusterIdSupplier clusterIdSupplier;
-
- /** Factory producing {@link RecoveryInitiatorHandshakeManager} instances.
*/
- private final @Nullable RecoveryInitiatorHandshakeManagerFactory
initiatorHandshakeManagerFactory;
+ protected final ClusterIdSupplier clusterIdSupplier;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
- private final AtomicBoolean stopping = new AtomicBoolean(false);
+ protected final AtomicBoolean stopping = new AtomicBoolean(false);
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
/** Recovery descriptor provider. */
- private final RecoveryDescriptorProvider descriptorProvider = new
DefaultRecoveryDescriptorProvider();
+ protected final RecoveryDescriptorProvider descriptorProvider = new
DefaultRecoveryDescriptorProvider();
/** Thread pool used for connection management tasks (like disposing
recovery descriptors on node left or on stop). */
private final ExecutorService connectionMaintenanceExecutor;
private final ChannelTypeRegistry channelTypeRegistry;
- private final IgniteProductVersionSource productVersionSource;
+ protected final IgniteProductVersionSource productVersionSource;
/** {@code null} if SSL is not {@link SslConfigurationSchema#enabled}. */
private final @Nullable SslContext clientSslContext;
- /**
- * Constructor.
- *
- * @param networkConfiguration Network configuration.
- * @param serializationService Serialization service.
- * @param nodeName Node name.
- * @param nodeId ID of this node.
- * @param bootstrapFactory Bootstrap factory.
- * @param staleIdDetector Detects stale member IDs.
- * @param clusterIdSupplier Supplier of cluster ID.
- * @param channelTypeRegistry {@link ChannelType} registry.
- * @param productVersionSource Source of product version.
- */
- public ConnectionManager(
- NetworkView networkConfiguration,
- SerializationService serializationService,
- String nodeName,
- UUID nodeId,
- NettyBootstrapFactory bootstrapFactory,
- StaleIdDetector staleIdDetector,
- ClusterIdSupplier clusterIdSupplier,
- ChannelTypeRegistry channelTypeRegistry,
- IgniteProductVersionSource productVersionSource
- ) {
- this(
- networkConfiguration,
- serializationService,
- nodeName,
- nodeId,
- bootstrapFactory,
- staleIdDetector,
- clusterIdSupplier,
- null,
- channelTypeRegistry,
- productVersionSource
- );
- }
+ /** Cluster topology service. */
+ protected final TopologyService topologyService;
+
+ /** Failure processor. */
+ protected final FailureProcessor failureProcessor;
/**
* Constructor.
@@ -199,9 +165,10 @@ public class ConnectionManager implements
ChannelCreationListener {
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
* @param clusterIdSupplier Supplier of cluster ID.
- * @param initiatorHandshakeManagerFactory Factory for {@link
RecoveryInitiatorHandshakeManager} instances.
* @param channelTypeRegistry {@link ChannelType} registry.
* @param productVersionSource Source of product version.
+ * @param topologyService Cluster topology service.
+ * @param failureProcessor Failure processor.
*/
public ConnectionManager(
NetworkView networkConfiguration,
@@ -211,18 +178,20 @@ public class ConnectionManager implements
ChannelCreationListener {
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
ClusterIdSupplier clusterIdSupplier,
- @Nullable RecoveryInitiatorHandshakeManagerFactory
initiatorHandshakeManagerFactory,
ChannelTypeRegistry channelTypeRegistry,
- IgniteProductVersionSource productVersionSource
+ IgniteProductVersionSource productVersionSource,
+ TopologyService topologyService,
+ FailureProcessor failureProcessor
) {
this.serializationService = serializationService;
this.nodeId = nodeId;
this.bootstrapFactory = bootstrapFactory;
this.staleIdDetector = staleIdDetector;
this.clusterIdSupplier = clusterIdSupplier;
- this.initiatorHandshakeManagerFactory =
initiatorHandshakeManagerFactory;
this.channelTypeRegistry = channelTypeRegistry;
this.productVersionSource = productVersionSource;
+ this.topologyService = topologyService;
+ this.failureProcessor = failureProcessor;
SslView ssl = networkConfiguration.ssl();
@@ -525,33 +494,41 @@ public class ConnectionManager implements
ChannelCreationListener {
private HandshakeManager createInitiatorHandshakeManager(short
connectionId) {
InternalClusterNode localNode =
Objects.requireNonNull(localNodeFuture.getNow(null), "localNode not set");
- if (initiatorHandshakeManagerFactory == null) {
- return new RecoveryInitiatorHandshakeManager(
- localNode,
- connectionId,
- descriptorProvider,
- bootstrapFactory.handshakeEventLoopSwitcher(),
- staleIdDetector,
- clusterIdSupplier,
- this,
- stopping::get,
- productVersionSource
- );
- }
+ return newRecoveryInitiatorHandshakeManager(connectionId, localNode);
+ }
- return initiatorHandshakeManagerFactory.create(
+ /**
+ * Factory method for overriding the handshake manager implementation in
subclasses.
+ */
+ protected RecoveryInitiatorHandshakeManager
newRecoveryInitiatorHandshakeManager(
+ short connectionId,
+ InternalClusterNode localNode
+ ) {
+ return new RecoveryInitiatorHandshakeManager(
localNode,
connectionId,
- descriptorProvider
+ descriptorProvider,
+ bootstrapFactory.handshakeEventLoopSwitcher(),
+ staleIdDetector,
+ clusterIdSupplier,
+ this,
+ stopping::get,
+ productVersionSource,
+ topologyService,
+ failureProcessor
);
}
private HandshakeManager createAcceptorHandshakeManager() {
// Do not just use localNodeFuture.join() to make sure the wait is
time-limited.
- waitForLocalNodeToBeSet();
+ InternalClusterNode localNode = waitForLocalNodeToBeSet();
+ return newRecoveryAcceptorHandshakeManager(localNode);
+ }
+
+ private RecoveryAcceptorHandshakeManager
newRecoveryAcceptorHandshakeManager(InternalClusterNode localNode) {
return new RecoveryAcceptorHandshakeManager(
- localNodeFuture.join(),
+ localNode,
FACTORY,
descriptorProvider,
bootstrapFactory.handshakeEventLoopSwitcher(),
@@ -559,13 +536,14 @@ public class ConnectionManager implements
ChannelCreationListener {
clusterIdSupplier,
this,
stopping::get,
- productVersionSource
+ productVersionSource,
+ topologyService
);
}
- private void waitForLocalNodeToBeSet() {
+ private InternalClusterNode waitForLocalNodeToBeSet() {
try {
- localNodeFuture.get(10, SECONDS);
+ return localNodeFuture.get(10, SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index 1a3ce37a28f..ceef409062b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -105,6 +106,10 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
/** Recovery descriptor. */
private RecoveryDescriptor recoveryDescriptor;
+ /** Cluster topology service. */
+ @SuppressWarnings("FieldCanBeLocal")
+ private final TopologyService topologyService;
+
/**
* Constructor.
*
@@ -113,6 +118,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
* @param recoveryDescriptorProvider Recovery descriptor provider.
* @param stopping Defines whether the corresponding connection manager is
stopping.
* @param productVersionSource Source of product version.
+ * @param topologyService Cluster topology service.
*/
public RecoveryAcceptorHandshakeManager(
InternalClusterNode localNode,
@@ -123,7 +129,8 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping,
- IgniteProductVersionSource productVersionSource
+ IgniteProductVersionSource productVersionSource,
+ TopologyService topologyService
) {
this.localNode = localNode;
this.messageFactory = messageFactory;
@@ -133,6 +140,7 @@ public class RecoveryAcceptorHandshakeManager implements
HandshakeManager {
this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
this.productVersionSource = productVersionSource;
+ this.topologyService = topologyService;
this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index 21c9d93559e..db34a4ead3a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
@@ -114,6 +116,14 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
/** Recovery descriptor. */
private RecoveryDescriptor recoveryDescriptor;
+ /** Cluster topology service. */
+ @SuppressWarnings("FieldCanBeLocal")
+ private final TopologyService topologyService;
+
+ /** Failure processor. */
+ @SuppressWarnings("FieldCanBeLocal")
+ private final FailureProcessor failureProcessor;
+
/**
* Constructor.
*
@@ -131,7 +141,9 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping,
- IgniteProductVersionSource productVersionSource
+ IgniteProductVersionSource productVersionSource,
+ TopologyService topologyService,
+ FailureProcessor failureProcessor
) {
this.localNode = localNode;
this.connectionId = connectionId;
@@ -141,6 +153,8 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
this.productVersionSource = productVersionSource;
+ this.topologyService = topologyService;
+ this.failureProcessor = failureProcessor;
localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
deleted file mode 100644
index d0e92c66657..00000000000
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.recovery;
-
-import org.apache.ignite.internal.network.InternalClusterNode;
-
-/**
- * Factory producing {@link RecoveryInitiatorHandshakeManager} instances.
- */
-@FunctionalInterface
-public interface RecoveryInitiatorHandshakeManagerFactory {
- /**
- * Produces a {@link RecoveryInitiatorHandshakeManager} instance.
- *
- * @param localNode This node.
- * @param connectionId ID of the connection.
- * @param recoveryDescriptorProvider Provider of recovery descriptors to
be used.
- * @return Created manager.
- */
- RecoveryInitiatorHandshakeManager create(
- InternalClusterNode localNode,
- short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider
- );
-}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index debf3079834..c044f3c095b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -162,7 +162,9 @@ public class ScaleCubeClusterServiceFactory {
staleIds,
clusterIdSupplier,
channelTypeRegistry,
- productVersionSource
+ productVersionSource,
+ topologyService,
+ failureProcessor
);
this.connectionMgr = connectionMgr;
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 3d87670c2b2..11d7ad11b80 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -61,6 +61,7 @@ import java.util.regex.Pattern;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
@@ -72,9 +73,7 @@ import
org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
-import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
-import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
@@ -624,17 +623,14 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
NettyBootstrapFactory bootstrapFactory = new
NettyBootstrapFactory(networkConfig, eventLoopGroupNamePrefix);
assertThat(bootstrapFactory.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- ConnectionManager connectionManager = new ConnectionManager(
- networkConfig.value(),
+ ConnectionManager connectionManager = new TestConnectionManager(
+ networkConfig,
serializationService,
- node.name(),
- node.id(),
+ node,
bootstrapFactory,
staleIdDetector,
clusterIdSupplier,
- initiatorHandshakeManagerFactoryAdding(beforeHandshake,
bootstrapFactory, staleIdDetector, clusterIdSupplier),
- channelTypeRegistry,
- new DefaultIgniteProductVersionSource()
+ beforeHandshake
);
connectionManager.start();
connectionManager.setLocalNode(node);
@@ -646,39 +642,61 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
return new Services(connectionManager, messagingService,
bootstrapFactory);
}
- private RecoveryInitiatorHandshakeManagerFactory
initiatorHandshakeManagerFactoryAdding(
- Runnable beforeHandshake,
- NettyBootstrapFactory bootstrapFactory,
- StaleIdDetector staleIdDetector,
- ClusterIdSupplier clusterIdSupplier
- ) {
- return new RecoveryInitiatorHandshakeManagerFactory() {
- @Override
- public RecoveryInitiatorHandshakeManager create(
- InternalClusterNode localNode,
- short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider
+ private class TestConnectionManager extends ConnectionManager {
+ private final Runnable beforeHandshake;
+
+ private TestConnectionManager(
+ NetworkConfiguration networkConfig,
+ SerializationService serializationService,
+ InternalClusterNode node,
+ NettyBootstrapFactory bootstrapFactory,
+ StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier,
+ Runnable beforeHandshake
+ ) {
+ super(
+ networkConfig.value(),
+ serializationService,
+ node.name(),
+ node.id(),
+ bootstrapFactory,
+ staleIdDetector,
+ clusterIdSupplier,
+ DefaultMessagingServiceTest.this.channelTypeRegistry,
+ new DefaultIgniteProductVersionSource(),
+ DefaultMessagingServiceTest.this.topologyService,
+ DefaultMessagingServiceTest.this.failureProcessor
+ );
+
+ this.beforeHandshake = beforeHandshake;
+ }
+
+ @Override
+ protected RecoveryInitiatorHandshakeManager
newRecoveryInitiatorHandshakeManager(
+ short connectionId,
+ InternalClusterNode localNode
+ ) {
+ return new RecoveryInitiatorHandshakeManager(
+ localNode,
+ connectionId,
+ descriptorProvider,
+ bootstrapFactory.handshakeEventLoopSwitcher(),
+ staleIdDetector,
+ clusterIdSupplier,
+ channel -> {},
+ () -> false,
+ new DefaultIgniteProductVersionSource(),
+ this.topologyService,
+ new NoOpFailureManager()
) {
- return new RecoveryInitiatorHandshakeManager(
- localNode,
- connectionId,
- recoveryDescriptorProvider,
- bootstrapFactory.handshakeEventLoopSwitcher(),
- staleIdDetector,
- clusterIdSupplier,
- channel -> {},
- () -> false,
- new DefaultIgniteProductVersionSource()
- ) {
- @Override
- protected void finishHandshake() {
- beforeHandshake.run();
-
- super.finishHandshake();
- }
- };
- }
- };
+ @Override
+ protected void finishHandshake() {
+ beforeHandshake.run();
+
+ super.finishHandshake();
+ }
+ };
+ }
}
private static String localHostName() {
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 24b755118e7..c5f85e2233a 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
@@ -38,12 +39,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.messages.TestMessage;
@@ -101,6 +104,8 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
private final ClusterIdSupplier clusterIdSupplier = new
ConstantClusterIdSupplier(UUID.randomUUID());
+ protected final TopologyService topologyService =
mock(TopologyService.class);
+
@Test
public void testHandshake() throws Exception {
RecoveryDescriptorProvider initiatorRecovery =
createRecoveryDescriptorProvider();
@@ -769,7 +774,9 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
clusterIdSupplier,
channel -> {},
() -> false,
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ topologyService,
+ new NoOpFailureManager()
);
}
@@ -802,7 +809,8 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
clusterIdSupplier,
channel -> {},
() -> false,
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ topologyService
);
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index f15ebf5ac14..80187692436 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
@@ -102,6 +103,9 @@ class RecoveryAcceptorHandshakeManagerTest extends
HandshakeManagerTest {
@Captor
private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
+ @Mock
+ private TopologyService topologyService;
+
private final RecoveryDescriptor recoveryDescriptor = new
RecoveryDescriptor(100);
private final AtomicBoolean acceptorHandshakeManagerStopping = new
AtomicBoolean(false);
@@ -175,7 +179,8 @@ class RecoveryAcceptorHandshakeManagerTest extends
HandshakeManagerTest {
new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,
stopping,
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ topologyService
);
manager.onInit(context);
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
index e1285ebaa6b..6458e900aa0 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
@@ -46,11 +46,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
@@ -123,6 +125,9 @@ class RecoveryInitiatorHandshakeManagerTest extends
HandshakeManagerTest {
private final AtomicBoolean initiatorHandshakeManagerStopping = new
AtomicBoolean(false);
+ @Mock
+ private TopologyService topologyService;
+
@BeforeEach
void initMocks() {
lenient().when(thisContext.channel()).thenReturn(thisChannel);
@@ -189,7 +194,9 @@ class RecoveryInitiatorHandshakeManagerTest extends
HandshakeManagerTest {
new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,
stopping,
- new DefaultIgniteProductVersionSource()
+ new DefaultIgniteProductVersionSource(),
+ topologyService,
+ new NoOpFailureManager()
);
manager.onInit(thisContext);