This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fedaaa260d6 IGNITE-27290 Add new components to handshake managers 
(#7191)
fedaaa260d6 is described below

commit fedaaa260d63fd74712960f51abb2317deb08c28
Author: Ivan Bessonov <[email protected]>
AuthorDate: Thu Dec 11 11:40:45 2025 +0300

    IGNITE-27290 Add new components to handshake managers (#7191)
---
 modules/network/build.gradle                       |   2 +
 .../network/netty/ItConnectionManagerTest.java     |   6 +-
 .../internal/network/netty/ConnectionManager.java  | 122 +++++++++------------
 .../recovery/RecoveryAcceptorHandshakeManager.java |  10 +-
 .../RecoveryInitiatorHandshakeManager.java         |  16 ++-
 .../RecoveryInitiatorHandshakeManagerFactory.java  |  40 -------
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   4 +-
 .../network/DefaultMessagingServiceTest.java       | 100 ++++++++++-------
 .../network/netty/RecoveryHandshakeTest.java       |  12 +-
 .../RecoveryAcceptorHandshakeManagerTest.java      |   7 +-
 .../RecoveryInitiatorHandshakeManagerTest.java     |   9 +-
 11 files changed, 167 insertions(+), 161 deletions(-)

diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index f96afa97f71..6ab6851685d 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -54,6 +54,7 @@ dependencies {
     testImplementation project(':ignite-network-annotation-processor')
     testImplementation testFixtures(project(':ignite-configuration'))
     testImplementation testFixtures(project(':ignite-core'))
+    testImplementation testFixtures(project(':ignite-failure-handler'))
     testImplementation libs.jmh.core
     testImplementation(libs.kryo) {
         //IDEA test runner don't apply Gradle dependency resolve strategy, 
this is just not implemented
@@ -83,6 +84,7 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
+    integrationTestImplementation 
testFixtures(project(':ignite-failure-handler:'))
     integrationTestImplementation libs.compileTesting
     integrationTestImplementation libs.netty.handler
     integrationTestImplementation libs.scalecube.cluster
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 82961fea365..c75a9600e81 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -60,6 +60,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.manager.ComponentContext;
@@ -69,6 +70,7 @@ import 
org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
@@ -564,7 +566,9 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
                     new AllIdsAreFresh(),
                     withoutClusterId(),
                     defaultChannelTypeRegistry(),
-                    new DefaultIgniteProductVersionSource()
+                    new DefaultIgniteProductVersionSource(),
+                    mock(TopologyService.class),
+                    new NoOpFailureManager()
             );
 
             manager.start();
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 37b3ceb068c..55ce2153bdd 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.SslConfigurationSchema;
 import org.apache.ignite.internal.network.configuration.SslView;
@@ -68,7 +70,6 @@ import 
org.apache.ignite.internal.network.recovery.RecoveryAcceptorHandshakeMana
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
-import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.ssl.SslContextProvider;
@@ -83,7 +84,7 @@ import org.jetbrains.annotations.TestOnly;
  */
 public class ConnectionManager implements ChannelCreationListener {
     /** Message factory. */
-    private static final NetworkMessagesFactory FACTORY = new 
NetworkMessagesFactory();
+    protected static final NetworkMessagesFactory FACTORY = new 
NetworkMessagesFactory();
 
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(ConnectionManager.class);
@@ -120,74 +121,39 @@ public class ConnectionManager implements 
ChannelCreationListener {
      */
     private final CompletableFuture<InternalClusterNode> localNodeFuture = new 
CompletableFuture<>();
 
-    private final NettyBootstrapFactory bootstrapFactory;
+    protected final NettyBootstrapFactory bootstrapFactory;
 
     /** Used to detect that a peer uses a stale ID. */
-    private final StaleIdDetector staleIdDetector;
+    protected final StaleIdDetector staleIdDetector;
 
-    private final ClusterIdSupplier clusterIdSupplier;
-
-    /** Factory producing {@link RecoveryInitiatorHandshakeManager} instances. 
*/
-    private final @Nullable RecoveryInitiatorHandshakeManagerFactory 
initiatorHandshakeManagerFactory;
+    protected final ClusterIdSupplier clusterIdSupplier;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
-    private final AtomicBoolean stopping = new AtomicBoolean(false);
+    protected final AtomicBoolean stopping = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
     /** Recovery descriptor provider. */
-    private final RecoveryDescriptorProvider descriptorProvider = new 
DefaultRecoveryDescriptorProvider();
+    protected final RecoveryDescriptorProvider descriptorProvider = new 
DefaultRecoveryDescriptorProvider();
 
     /** Thread pool used for connection management tasks (like disposing 
recovery descriptors on node left or on stop). */
     private final ExecutorService connectionMaintenanceExecutor;
 
     private final ChannelTypeRegistry channelTypeRegistry;
 
-    private final IgniteProductVersionSource productVersionSource;
+    protected final IgniteProductVersionSource productVersionSource;
 
     /** {@code null} if SSL is not {@link SslConfigurationSchema#enabled}. */
     private final @Nullable SslContext clientSslContext;
 
-    /**
-     * Constructor.
-     *
-     * @param networkConfiguration Network configuration.
-     * @param serializationService Serialization service.
-     * @param nodeName Node name.
-     * @param nodeId ID of this node.
-     * @param bootstrapFactory Bootstrap factory.
-     * @param staleIdDetector Detects stale member IDs.
-     * @param clusterIdSupplier Supplier of cluster ID.
-     * @param channelTypeRegistry {@link ChannelType} registry.
-     * @param productVersionSource Source of product version.
-     */
-    public ConnectionManager(
-            NetworkView networkConfiguration,
-            SerializationService serializationService,
-            String nodeName,
-            UUID nodeId,
-            NettyBootstrapFactory bootstrapFactory,
-            StaleIdDetector staleIdDetector,
-            ClusterIdSupplier clusterIdSupplier,
-            ChannelTypeRegistry channelTypeRegistry,
-            IgniteProductVersionSource productVersionSource
-    ) {
-        this(
-                networkConfiguration,
-                serializationService,
-                nodeName,
-                nodeId,
-                bootstrapFactory,
-                staleIdDetector,
-                clusterIdSupplier,
-                null,
-                channelTypeRegistry,
-                productVersionSource
-        );
-    }
+    /** Cluster topology service. */
+    protected final TopologyService topologyService;
+
+    /** Failure processor. */
+    protected final FailureProcessor failureProcessor;
 
     /**
      * Constructor.
@@ -199,9 +165,10 @@ public class ConnectionManager implements 
ChannelCreationListener {
      * @param bootstrapFactory Bootstrap factory.
      * @param staleIdDetector Detects stale member IDs.
      * @param clusterIdSupplier Supplier of cluster ID.
-     * @param initiatorHandshakeManagerFactory Factory for {@link 
RecoveryInitiatorHandshakeManager} instances.
      * @param channelTypeRegistry {@link ChannelType} registry.
      * @param productVersionSource Source of product version.
+     * @param topologyService Cluster topology service.
+     * @param failureProcessor Failure processor.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
@@ -211,18 +178,20 @@ public class ConnectionManager implements 
ChannelCreationListener {
             NettyBootstrapFactory bootstrapFactory,
             StaleIdDetector staleIdDetector,
             ClusterIdSupplier clusterIdSupplier,
-            @Nullable RecoveryInitiatorHandshakeManagerFactory 
initiatorHandshakeManagerFactory,
             ChannelTypeRegistry channelTypeRegistry,
-            IgniteProductVersionSource productVersionSource
+            IgniteProductVersionSource productVersionSource,
+            TopologyService topologyService,
+            FailureProcessor failureProcessor
     ) {
         this.serializationService = serializationService;
         this.nodeId = nodeId;
         this.bootstrapFactory = bootstrapFactory;
         this.staleIdDetector = staleIdDetector;
         this.clusterIdSupplier = clusterIdSupplier;
-        this.initiatorHandshakeManagerFactory = 
initiatorHandshakeManagerFactory;
         this.channelTypeRegistry = channelTypeRegistry;
         this.productVersionSource = productVersionSource;
+        this.topologyService = topologyService;
+        this.failureProcessor = failureProcessor;
 
         SslView ssl = networkConfiguration.ssl();
 
@@ -525,33 +494,41 @@ public class ConnectionManager implements 
ChannelCreationListener {
     private HandshakeManager createInitiatorHandshakeManager(short 
connectionId) {
         InternalClusterNode localNode = 
Objects.requireNonNull(localNodeFuture.getNow(null), "localNode not set");
 
-        if (initiatorHandshakeManagerFactory == null) {
-            return new RecoveryInitiatorHandshakeManager(
-                    localNode,
-                    connectionId,
-                    descriptorProvider,
-                    bootstrapFactory.handshakeEventLoopSwitcher(),
-                    staleIdDetector,
-                    clusterIdSupplier,
-                    this,
-                    stopping::get,
-                    productVersionSource
-            );
-        }
+        return newRecoveryInitiatorHandshakeManager(connectionId, localNode);
+    }
 
-        return initiatorHandshakeManagerFactory.create(
+    /**
+     * Factory method for overriding the handshake manager implementation in 
subclasses.
+     */
+    protected RecoveryInitiatorHandshakeManager 
newRecoveryInitiatorHandshakeManager(
+            short connectionId,
+            InternalClusterNode localNode
+    ) {
+        return new RecoveryInitiatorHandshakeManager(
                 localNode,
                 connectionId,
-                descriptorProvider
+                descriptorProvider,
+                bootstrapFactory.handshakeEventLoopSwitcher(),
+                staleIdDetector,
+                clusterIdSupplier,
+                this,
+                stopping::get,
+                productVersionSource,
+                topologyService,
+                failureProcessor
         );
     }
 
     private HandshakeManager createAcceptorHandshakeManager() {
         // Do not just use localNodeFuture.join() to make sure the wait is 
time-limited.
-        waitForLocalNodeToBeSet();
+        InternalClusterNode localNode = waitForLocalNodeToBeSet();
 
+        return newRecoveryAcceptorHandshakeManager(localNode);
+    }
+
+    private RecoveryAcceptorHandshakeManager 
newRecoveryAcceptorHandshakeManager(InternalClusterNode localNode) {
         return new RecoveryAcceptorHandshakeManager(
-                localNodeFuture.join(),
+                localNode,
                 FACTORY,
                 descriptorProvider,
                 bootstrapFactory.handshakeEventLoopSwitcher(),
@@ -559,13 +536,14 @@ public class ConnectionManager implements 
ChannelCreationListener {
                 clusterIdSupplier,
                 this,
                 stopping::get,
-                productVersionSource
+                productVersionSource,
+                topologyService
         );
     }
 
-    private void waitForLocalNodeToBeSet() {
+    private InternalClusterNode waitForLocalNodeToBeSet() {
         try {
-            localNodeFuture.get(10, SECONDS);
+            return localNodeFuture.get(10, SECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index 1a3ce37a28f..ceef409062b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -105,6 +106,10 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor. */
     private RecoveryDescriptor recoveryDescriptor;
 
+    /** Cluster topology service. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private final TopologyService topologyService;
+
     /**
      * Constructor.
      *
@@ -113,6 +118,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
      * @param recoveryDescriptorProvider Recovery descriptor provider.
      * @param stopping Defines whether the corresponding connection manager is 
stopping.
      * @param productVersionSource Source of product version.
+     * @param topologyService Cluster topology service.
      */
     public RecoveryAcceptorHandshakeManager(
             InternalClusterNode localNode,
@@ -123,7 +129,8 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
             ClusterIdSupplier clusterIdSupplier,
             ChannelCreationListener channelCreationListener,
             BooleanSupplier stopping,
-            IgniteProductVersionSource productVersionSource
+            IgniteProductVersionSource productVersionSource,
+            TopologyService topologyService
     ) {
         this.localNode = localNode;
         this.messageFactory = messageFactory;
@@ -133,6 +140,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         this.clusterIdSupplier = clusterIdSupplier;
         this.stopping = stopping;
         this.productVersionSource = productVersionSource;
+        this.topologyService = topologyService;
 
         this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index 21c9d93559e..db34a4ead3a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.RecipientLeftException;
+import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
@@ -114,6 +116,14 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor. */
     private RecoveryDescriptor recoveryDescriptor;
 
+    /** Cluster topology service. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private final TopologyService topologyService;
+
+    /** Failure processor. */
+    @SuppressWarnings("FieldCanBeLocal")
+    private final FailureProcessor failureProcessor;
+
     /**
      * Constructor.
      *
@@ -131,7 +141,9 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
             ClusterIdSupplier clusterIdSupplier,
             ChannelCreationListener channelCreationListener,
             BooleanSupplier stopping,
-            IgniteProductVersionSource productVersionSource
+            IgniteProductVersionSource productVersionSource,
+            TopologyService topologyService,
+            FailureProcessor failureProcessor
     ) {
         this.localNode = localNode;
         this.connectionId = connectionId;
@@ -141,6 +153,8 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
         this.clusterIdSupplier = clusterIdSupplier;
         this.stopping = stopping;
         this.productVersionSource = productVersionSource;
+        this.topologyService = topologyService;
+        this.failureProcessor = failureProcessor;
 
         localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
deleted file mode 100644
index d0e92c66657..00000000000
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.recovery;
-
-import org.apache.ignite.internal.network.InternalClusterNode;
-
-/**
- * Factory producing {@link RecoveryInitiatorHandshakeManager} instances.
- */
-@FunctionalInterface
-public interface RecoveryInitiatorHandshakeManagerFactory {
-    /**
-     * Produces a {@link RecoveryInitiatorHandshakeManager} instance.
-     *
-     * @param localNode This node.
-     * @param connectionId ID of the connection.
-     * @param recoveryDescriptorProvider Provider of recovery descriptors to 
be used.
-     * @return Created manager.
-     */
-    RecoveryInitiatorHandshakeManager create(
-            InternalClusterNode localNode,
-            short connectionId,
-            RecoveryDescriptorProvider recoveryDescriptorProvider
-    );
-}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index debf3079834..c044f3c095b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -162,7 +162,9 @@ public class ScaleCubeClusterServiceFactory {
                         staleIds,
                         clusterIdSupplier,
                         channelTypeRegistry,
-                        productVersionSource
+                        productVersionSource,
+                        topologyService,
+                        failureProcessor
                 );
                 this.connectionMgr = connectionMgr;
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 3d87670c2b2..11d7ad11b80 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -61,6 +61,7 @@ import java.util.regex.Pattern;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.messages.AllTypesMessageImpl;
@@ -72,9 +73,7 @@ import 
org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
-import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
-import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
@@ -624,17 +623,14 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         NettyBootstrapFactory bootstrapFactory = new 
NettyBootstrapFactory(networkConfig, eventLoopGroupNamePrefix);
         assertThat(bootstrapFactory.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
 
-        ConnectionManager connectionManager = new ConnectionManager(
-                networkConfig.value(),
+        ConnectionManager connectionManager = new TestConnectionManager(
+                networkConfig,
                 serializationService,
-                node.name(),
-                node.id(),
+                node,
                 bootstrapFactory,
                 staleIdDetector,
                 clusterIdSupplier,
-                initiatorHandshakeManagerFactoryAdding(beforeHandshake, 
bootstrapFactory, staleIdDetector, clusterIdSupplier),
-                channelTypeRegistry,
-                new DefaultIgniteProductVersionSource()
+                beforeHandshake
         );
         connectionManager.start();
         connectionManager.setLocalNode(node);
@@ -646,39 +642,61 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         return new Services(connectionManager, messagingService, 
bootstrapFactory);
     }
 
-    private RecoveryInitiatorHandshakeManagerFactory 
initiatorHandshakeManagerFactoryAdding(
-            Runnable beforeHandshake,
-            NettyBootstrapFactory bootstrapFactory,
-            StaleIdDetector staleIdDetector,
-            ClusterIdSupplier clusterIdSupplier
-    ) {
-        return new RecoveryInitiatorHandshakeManagerFactory() {
-            @Override
-            public RecoveryInitiatorHandshakeManager create(
-                    InternalClusterNode localNode,
-                    short connectionId,
-                    RecoveryDescriptorProvider recoveryDescriptorProvider
+    private class TestConnectionManager extends ConnectionManager {
+        private final Runnable beforeHandshake;
+
+        private TestConnectionManager(
+                NetworkConfiguration networkConfig,
+                SerializationService serializationService,
+                InternalClusterNode node,
+                NettyBootstrapFactory bootstrapFactory,
+                StaleIdDetector staleIdDetector,
+                ClusterIdSupplier clusterIdSupplier,
+                Runnable beforeHandshake
+        ) {
+            super(
+                    networkConfig.value(),
+                    serializationService,
+                    node.name(),
+                    node.id(),
+                    bootstrapFactory,
+                    staleIdDetector,
+                    clusterIdSupplier,
+                    DefaultMessagingServiceTest.this.channelTypeRegistry,
+                    new DefaultIgniteProductVersionSource(),
+                    DefaultMessagingServiceTest.this.topologyService,
+                    DefaultMessagingServiceTest.this.failureProcessor
+            );
+
+            this.beforeHandshake = beforeHandshake;
+        }
+
+        @Override
+        protected RecoveryInitiatorHandshakeManager 
newRecoveryInitiatorHandshakeManager(
+                short connectionId,
+                InternalClusterNode localNode
+        ) {
+            return new RecoveryInitiatorHandshakeManager(
+                    localNode,
+                    connectionId,
+                    descriptorProvider,
+                    bootstrapFactory.handshakeEventLoopSwitcher(),
+                    staleIdDetector,
+                    clusterIdSupplier,
+                    channel -> {},
+                    () -> false,
+                    new DefaultIgniteProductVersionSource(),
+                    this.topologyService,
+                    new NoOpFailureManager()
             ) {
-                return new RecoveryInitiatorHandshakeManager(
-                        localNode,
-                        connectionId,
-                        recoveryDescriptorProvider,
-                        bootstrapFactory.handshakeEventLoopSwitcher(),
-                        staleIdDetector,
-                        clusterIdSupplier,
-                        channel -> {},
-                        () -> false,
-                        new DefaultIgniteProductVersionSource()
-                ) {
-                    @Override
-                    protected void finishHandshake() {
-                        beforeHandshake.run();
-
-                        super.finishHandshake();
-                    }
-                };
-            }
-        };
+                @Override
+                protected void finishHandshake() {
+                    beforeHandshake.run();
+
+                    super.finishHandshake();
+                }
+            };
+        }
     }
 
     private static String localHostName() {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 24b755118e7..c5f85e2233a 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
@@ -38,12 +39,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.messages.TestMessage;
@@ -101,6 +104,8 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
 
     private final ClusterIdSupplier clusterIdSupplier = new 
ConstantClusterIdSupplier(UUID.randomUUID());
 
+    protected final TopologyService topologyService = 
mock(TopologyService.class);
+
     @Test
     public void testHandshake() throws Exception {
         RecoveryDescriptorProvider initiatorRecovery = 
createRecoveryDescriptorProvider();
@@ -769,7 +774,9 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
                 clusterIdSupplier,
                 channel -> {},
                 () -> false,
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                topologyService,
+                new NoOpFailureManager()
         );
     }
 
@@ -802,7 +809,8 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
                 clusterIdSupplier,
                 channel -> {},
                 () -> false,
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                topologyService
         );
     }
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index f15ebf5ac14..80187692436 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
@@ -102,6 +103,9 @@ class RecoveryAcceptorHandshakeManagerTest extends 
HandshakeManagerTest {
     @Captor
     private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
 
+    @Mock
+    private TopologyService topologyService;
+
     private final RecoveryDescriptor recoveryDescriptor = new 
RecoveryDescriptor(100);
 
     private final AtomicBoolean acceptorHandshakeManagerStopping = new 
AtomicBoolean(false);
@@ -175,7 +179,8 @@ class RecoveryAcceptorHandshakeManagerTest extends 
HandshakeManagerTest {
                 new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
                 channelCreationListener,
                 stopping,
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                topologyService
         );
 
         manager.onInit(context);
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
index e1285ebaa6b..6458e900aa0 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManagerTest.java
@@ -46,11 +46,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import 
org.apache.ignite.internal.network.handshake.NoOpHandshakeEventLoopSwitcher;
@@ -123,6 +125,9 @@ class RecoveryInitiatorHandshakeManagerTest extends 
HandshakeManagerTest {
 
     private final AtomicBoolean initiatorHandshakeManagerStopping = new 
AtomicBoolean(false);
 
+    @Mock
+    private TopologyService topologyService;
+
     @BeforeEach
     void initMocks() {
         lenient().when(thisContext.channel()).thenReturn(thisChannel);
@@ -189,7 +194,9 @@ class RecoveryInitiatorHandshakeManagerTest extends 
HandshakeManagerTest {
                 new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
                 channelCreationListener,
                 stopping,
-                new DefaultIgniteProductVersionSource()
+                new DefaultIgniteProductVersionSource(),
+                topologyService,
+                new NoOpFailureManager()
         );
 
         manager.onInit(thisContext);


Reply via email to