This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 527b7b27d5 IGNITE-19903 Fix recovery descriptor race condition (#2283)
527b7b27d5 is described below

commit 527b7b27d5cfeb705ff38a0f73fae0a6500ffcb7
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Jul 13 15:50:43 2023 +0400

    IGNITE-19903 Fix recovery descriptor race condition (#2283)
---
 .../ignite/internal/future/OrderingFuture.java     | 63 +++++++++++++-
 .../CompletableFutureCompletedMatcher.java         | 61 ++++++++++++++
 .../network/netty/ItConnectionManagerTest.java     | 97 ++++++++++++++++++++++
 .../scalecube/ItScaleCubeNetworkMessagingTest.java | 37 +++++++--
 ...ion.java => ChannelAlreadyExistsException.java} | 27 +++---
 .../network/handshake/HandshakeException.java      |  4 +-
 .../ChannelCreationListener.java}                  | 26 ++----
 .../internal/network/netty/ConnectionManager.java  | 50 +++++------
 .../netty/DefaultRecoveryDescriptorProvider.java   | 14 +---
 .../network/netty/InboundRecoveryHandler.java      |  8 ++
 .../ignite/internal/network/netty/NettySender.java |  9 +-
 .../ignite/internal/network/netty/NettyServer.java |  8 --
 .../network/recovery/HandshakeTieBreaker.java      | 40 +++++++++
 .../recovery/RecoveryClientHandshakeManager.java   | 92 +++++++++++++++-----
 .../network/recovery/RecoveryDescriptor.java       | 46 ++++++++++
 .../recovery/RecoveryDescriptorProvider.java       |  3 +-
 .../recovery/RecoveryServerHandshakeManager.java   | 96 ++++++++++++++++-----
 .../ignite/network/DefaultMessagingService.java    | 26 +++++-
 .../internal/network/netty/NettyServerTest.java    |  8 +-
 .../network/netty/RecoveryHandshakeTest.java       | 53 +++++++-----
 .../network/DefaultMessagingServiceTest.java       |  3 +-
 .../placementdriver/leases/LeaseTracker.java       |  1 -
 22 files changed, 592 insertions(+), 180 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
index 6aa95c04ff..40b7821f07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.jetbrains.annotations.Nullable;
 
@@ -299,8 +300,8 @@ public class OrderingFuture<T> {
     private static <T> void acceptQuietly(BiConsumer<? super T, ? super 
Throwable> action, T result, Throwable ex) {
         try {
             action.accept(result, ex);
-        } catch (Exception e) {
-            // ignore
+        } catch (Exception ignored) {
+            // No-op.
         }
     }
 
@@ -337,6 +338,41 @@ public class OrderingFuture<T> {
         }
     }
 
+    /**
+     * Adds a mapping function that gets executed as soon as this future gets 
completed for any reason. The function will accept both result
+     * and exception and return a future with the result of the function's 
execution.
+     *
+     * @param mapper The function to use to compute the value of the returned 
OrderingFuture.
+     * @return The new OrderingFuture.
+     * @see CompletableFuture#handle(BiFunction)
+     */
+    public <U> OrderingFuture<U> handle(BiFunction<? super T, Throwable, ? 
extends U> mapper) {
+        Handle<T, U> dependent = null;
+
+        while (true) {
+            State<T> prevState = state;
+
+            if (prevState.completionQueueProcessed()) {
+                try {
+                    U mappingResult = mapper.apply(prevState.result, 
prevState.exception);
+
+                    return completedFuture(mappingResult);
+                } catch (Throwable t) {
+                    return failedFuture(t);
+                }
+            }
+
+            if (dependent == null) {
+                dependent = new Handle<>(new OrderingFuture<>(), mapper);
+            }
+            State<T> newState = prevState.enqueueDependent(dependent);
+
+            if (replaceState(prevState, newState)) {
+                return dependent.resultFuture;
+            }
+        }
+    }
+
     private static CompletionException wrapWithCompletionException(Throwable 
ex) {
         return ex instanceof CompletionException ? (CompletionException) ex : 
new CompletionException(ex);
     }
@@ -463,6 +499,25 @@ public class OrderingFuture<T> {
         }
     }
 
+    private static class Handle<T, U> implements DependentAction<T> {
+        private final OrderingFuture<U> resultFuture;
+        private final BiFunction<? super T, Throwable, ? extends U> action;
+
+        private Handle(OrderingFuture<U> resultFuture, BiFunction<? super T, 
Throwable, ? extends U> action) {
+            this.resultFuture = resultFuture;
+            this.action = action;
+        }
+
+        @Override
+        public void onCompletion(@Nullable T result, @Nullable Throwable ex, 
NotificationContext context) {
+            try {
+                resultFuture.complete(action.apply(result, ex));
+            } catch (Throwable t) {
+                resultFuture.completeExceptionally(t);
+            }
+        }
+    }
+
     private static class ThenComposeToCompletable<T, U> implements 
DependentAction<T>, BiConsumer<U, Throwable> {
         private final CompletableFuture<U> resultFuture;
         private final Function<? super T, ? extends CompletableFuture<U>> 
mapper;
@@ -563,8 +618,8 @@ public class OrderingFuture<T> {
 
                 try {
                     node.dependent.onCompletion(result, exception, context);
-                } catch (Exception e) {
-                    // ignore
+                } catch (Exception ignored) {
+                    // No-op.
                 }
             }
         }
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
new file mode 100644
index 0000000000..c9047a5d67
--- /dev/null
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.util.concurrent.CompletableFuture;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher that tests if a CompletableFuture has completed successfully. */
+public class CompletableFutureCompletedMatcher<T> extends 
TypeSafeMatcher<CompletableFuture<T>> {
+    private CompletableFutureCompletedMatcher() {
+    }
+
+    @Override
+    protected boolean matchesSafely(CompletableFuture<T> future) {
+        return future.isDone() && !future.isCompletedExceptionally() && 
!future.isCancelled();
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("is a successfully completed 
CompletableFuture");
+    }
+
+    @Override
+    public void describeMismatchSafely(CompletableFuture<T> item, Description 
mismatchDescription) {
+        if (!item.isDone()) {
+            mismatchDescription.appendText("was not completed");
+        } else {
+            if (item.isCompletedExceptionally()) {
+                mismatchDescription.appendText("completed exceptionally");
+            } else if (item.isCancelled()) {
+                mismatchDescription.appendText("was cancelled");
+            } else {
+                // It might be successfully done now, but it wasn't at the 
moment of matchesSafely execution.
+                mismatchDescription.appendText("was not completed");
+            }
+        }
+    }
+
+    /**
+     * Creates a {@link CompletableFutureCompletedMatcher}.
+     */
+    public static <T> CompletableFutureCompletedMatcher<T> completedFuture() {
+        return new CompletableFutureCompletedMatcher<T>();
+    }
+}
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9a50a77c68..7ea89fec58 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -59,6 +63,7 @@ import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -295,6 +300,95 @@ public class ItConnectionManagerTest {
         server.close();
     }
 
+    /**
+     * Tests that if two nodes are opening channels to each other, only one 
channel survives.
+     *
+     * @throws Exception If failed.
+     */
+    @RepeatedTest(100)
+    public void testOneChannelLeftIfConnectToEachOther() throws Exception {
+        try (
+                ConnectionManagerWrapper manager1 = startManager(4000);
+                ConnectionManagerWrapper manager2 = startManager(4001)
+        ) {
+            CompletableFuture<NettySender> fut1 = 
manager1.openChannelTo(manager2).toCompletableFuture();
+            CompletableFuture<NettySender> fut2 = 
manager2.openChannelTo(manager1).toCompletableFuture();
+
+            NettySender sender1 = null;
+            NettySender sender2 = null;
+
+            try {
+                sender1 = fut1.get(1, TimeUnit.SECONDS);
+            } catch (Exception ignored) {
+                // No-op.
+            }
+            try {
+                sender2 = fut2.get(1, TimeUnit.SECONDS);
+            } catch (Exception ignored) {
+                // No-op.
+            }
+
+            NettySender highlander = null;
+
+            assertTrue(sender1 != null || sender2 != null);
+
+            if (sender1 != null && sender1.isOpen()) {
+                highlander = sender1;
+
+                boolean sender2NullOrClosed = sender2 == null || 
!sender2.isOpen();
+
+                assertTrue(sender2NullOrClosed);
+            }
+
+            if (sender2 != null && sender2.isOpen()) {
+                highlander = sender2;
+
+                boolean sender1NullOrClosed = sender1 == null || 
!sender1.isOpen();
+
+                assertTrue(sender1NullOrClosed);
+            }
+
+            assertNotNull(highlander);
+            assertTrue(highlander.isOpen());
+
+            assertTrue(
+                    waitForCondition(
+                            () -> manager1.channels().size() == 1 && 
manager2.channels().size() == 1,
+                            TimeUnit.SECONDS.toMillis(1)
+                    )
+            );
+
+            CompletableFuture<NettySender> channelFut1 = 
manager1.connectionManager.channel(
+                    manager2.connectionManager.consistentId(),
+                    ChannelType.DEFAULT,
+                    manager2.connectionManager.localAddress()
+            ).toCompletableFuture();
+
+            CompletableFuture<NettySender> channelFut2 = 
manager2.connectionManager.channel(
+                    manager1.connectionManager.consistentId(),
+                    ChannelType.DEFAULT,
+                    manager1.connectionManager.localAddress()
+            ).toCompletableFuture();
+
+            assertThat(channelFut1, is(completedFuture()));
+            assertThat(channelFut2, is(completedFuture()));
+
+            NettySender channel1 = channelFut1.getNow(null);
+            NettySender channel2 = channelFut2.getNow(null);
+
+            InetSocketAddress locAddr1 = (InetSocketAddress) 
channel1.channel().localAddress();
+            InetSocketAddress remoteAddr1 = (InetSocketAddress) 
channel1.channel().remoteAddress();
+
+            InetSocketAddress locAddr2 = (InetSocketAddress) 
channel2.channel().localAddress();
+            InetSocketAddress remoteAddr2 = (InetSocketAddress) 
channel2.channel().remoteAddress();
+
+            // Only compare ports because hosts may look different, eg 
localhost and 0.0.0.0. They are technically not same,
+            // although equal.
+            assertEquals(locAddr1.getPort(), remoteAddr2.getPort());
+            assertEquals(locAddr2.getPort(), remoteAddr1.getPort());
+        }
+    }
+
     /**
      * Creates a mock {@link MessageSerializationRegistry} that throws an 
exception when trying to get a serializer or a deserializer.
      */
@@ -383,5 +477,8 @@ public class ItConnectionManagerTest {
             );
         }
 
+        public Map<ConnectorKey<String>, NettySender> channels() {
+            return connectionManager.channels();
+        }
     }
 }
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 6f5c970339..782725c95a 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.network.scalecube;
 
+import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -51,6 +52,7 @@ import 
org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
@@ -355,25 +357,44 @@ class ItScaleCubeNetworkMessagingTest {
     }
 
     private void knockOutNode(String outcastName) throws InterruptedException {
-        CountDownLatch disappeared = new CountDownLatch(1);
+        CountDownLatch disappeared = new CountDownLatch(2);
 
-        testCluster.members.get(0).topologyService().addEventHandler(new 
TopologyEventHandler() {
+        TopologyEventHandler disappearListener = new TopologyEventHandler() {
             @Override
             public void onDisappeared(ClusterNode member) {
                 if (Objects.equals(member.name(), outcastName)) {
                     disappeared.countDown();
                 }
             }
-        });
+        };
 
-        testCluster.members.stream()
+        List<ClusterService> notOutcasts = testCluster.members.stream()
                 .filter(service -> !outcastName.equals(service.nodeName()))
-                .forEach(service -> {
-                    DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
-                    messagingService.dropMessages((recipientConsistentId, 
message) -> outcastName.equals(recipientConsistentId));
-                });
+                .collect(toList());
 
+        notOutcasts.forEach(clusterService -> {
+            
clusterService.topologyService().addEventHandler(disappearListener);
+        });
+
+        notOutcasts.forEach(service -> {
+            DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
+            messagingService.dropMessages((recipientConsistentId, message) -> 
outcastName.equals(recipientConsistentId));
+        });
+
+        // Wait until all nodes see disappearance of the outcast.
         assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not 
disappear in time");
+
+        DefaultMessagingService messagingService = (DefaultMessagingService) 
testCluster.members.stream()
+                .filter(service -> outcastName.equals(service.nodeName()))
+                .findFirst()
+                .get().messagingService();
+
+        ConnectionManager cm = messagingService.connectionManager();
+
+        // Forcefully close channels, so that nodes will create new channels 
on reanimation of the outcast.
+        cm.channels().forEach((stringConnectorKey, nettySender) -> {
+            nettySender.close().awaitUninterruptibly();
+        });
     }
 
     private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String 
outcastName) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
similarity index 61%
copy from 
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
index 197f68438b..96640e4506 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
@@ -18,25 +18,22 @@
 package org.apache.ignite.internal.network.handshake;
 
 /**
- * Handshake exception.
+ * Exception that notifies of existence of a channel with a specific 
consistent id during handshake.
  */
-public class HandshakeException extends Exception {
-    /**
-     * Constructor.
-     *
-     * @param message Handshake error message.
-     */
-    public HandshakeException(String message) {
-        super(message);
+public class ChannelAlreadyExistsException extends RuntimeException {
+    private static final long serialVersionUID = 0L;
+
+    /** Consistent id of a remote node. */
+    private final String consistentId;
+
+    public ChannelAlreadyExistsException(String consistentId) {
+        this.consistentId = consistentId;
     }
 
     /**
-     * Constructor.
-     *
-     * @param message Handshake error message.
-     * @param cause   Handshake error cause.
+     * Returns consistent id of the remote node with which a channel already 
exists.
      */
-    public HandshakeException(String message, Throwable cause) {
-        super(message, cause);
+    public String consistentId() {
+        return consistentId;
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
index 197f68438b..39b8fe02da 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.network.handshake;
 /**
  * Handshake exception.
  */
-public class HandshakeException extends Exception {
+public class HandshakeException extends RuntimeException {
+    private static final long serialVersionUID = 0L;
+
     /**
      * Constructor.
      *
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
similarity index 61%
copy from 
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
index 197f68438b..efdab3ba24 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
@@ -15,28 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.handshake;
-
-/**
- * Handshake exception.
- */
-public class HandshakeException extends Exception {
-    /**
-     * Constructor.
-     *
-     * @param message Handshake error message.
-     */
-    public HandshakeException(String message) {
-        super(message);
-    }
+package org.apache.ignite.internal.network.netty;
 
+/** Channel creation listener. */
+public interface ChannelCreationListener {
     /**
-     * Constructor.
+     * Notifies of the handshake's finish.
      *
-     * @param message Handshake error message.
-     * @param cause   Handshake error cause.
+     * @param channel Opened channel.
      */
-    public HandshakeException(String message, Throwable cause) {
-        super(message, cause);
-    }
+    void handshakeFinished(NettySender channel);
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 371c295e48..11988c6b4c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -54,7 +54,7 @@ import org.jetbrains.annotations.TestOnly;
 /**
  * Class that manages connections both incoming and outgoing.
  */
-public class ConnectionManager {
+public class ConnectionManager implements ChannelCreationListener {
     /** Message factory. */
     private static final NetworkMessagesFactory FACTORY = new 
NetworkMessagesFactory();
 
@@ -92,7 +92,7 @@ public class ConnectionManager {
     private final StaleIdDetector staleIdDetector;
 
     /** Factory producing {@link RecoveryClientHandshakeManager} instances. */
-    private final RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactory;
+    private final @Nullable RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactory;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
@@ -131,7 +131,7 @@ public class ConnectionManager {
                 consistentId,
                 bootstrapFactory,
                 staleIdDetector,
-                new 
DefaultRecoveryClientHandshakeManagerFactory(staleIdDetector)
+                null
         );
     }
 
@@ -153,7 +153,7 @@ public class ConnectionManager {
             String consistentId,
             NettyBootstrapFactory bootstrapFactory,
             StaleIdDetector staleIdDetector,
-            RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
+            @Nullable RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactory
     ) {
         this.serializationService = serializationService;
         this.launchId = launchId;
@@ -165,7 +165,6 @@ public class ConnectionManager {
         this.server = new NettyServer(
                 networkConfiguration,
                 this::createServerHandshakeManager,
-                this::onNewIncomingChannel,
                 this::onMessage,
                 serializationService,
                 bootstrapFactory
@@ -221,6 +220,7 @@ public class ConnectionManager {
      * @return Sender.
      */
     public OrderingFuture<NettySender> channel(@Nullable String consistentId, 
ChannelType type, InetSocketAddress address) {
+        // Problem is we can't look up a channel by consistent id because 
consistent id is not known yet.
         if (consistentId != null) {
             // If consistent id is known, try looking up a channel by 
consistent id. There can be an outbound connection
             // or an inbound connection associated with that consistent id.
@@ -263,13 +263,12 @@ public class ConnectionManager {
      *
      * @param channel Channel from client to this {@link #server}.
      */
-    private void onNewIncomingChannel(NettySender channel) {
+    @Override
+    public void handshakeFinished(NettySender channel) {
         ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), 
getChannel(channel.channelId()));
         NettySender oldChannel = channels.put(key, channel);
 
-        if (oldChannel != null) {
-            oldChannel.close();
-        }
+        assert oldChannel == null : "Incorrect channel creation flow";
     }
 
     /**
@@ -288,10 +287,7 @@ public class ConnectionManager {
         );
 
         client.start(clientBootstrap).whenComplete((sender, throwable) -> {
-            if (throwable == null) {
-                ConnectorKey<String> key = new 
ConnectorKey<>(sender.consistentId(), getChannel(sender.channelId()));
-                channels.put(key, sender);
-            } else {
+            if (throwable != null) {
                 clients.remove(new ConnectorKey<>(address, channelType));
             }
         });
@@ -344,6 +340,10 @@ public class ConnectionManager {
     }
 
     private HandshakeManager createClientHandshakeManager(short connectionId) {
+        if (clientHandshakeManagerFactory == null) {
+            return new RecoveryClientHandshakeManager(launchId, consistentId, 
connectionId, descriptorProvider, staleIdDetector, this);
+        }
+
         return clientHandshakeManagerFactory.create(
                 launchId,
                 consistentId,
@@ -353,7 +353,7 @@ public class ConnectionManager {
     }
 
     private HandshakeManager createServerHandshakeManager() {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, 
FACTORY, descriptorProvider, staleIdDetector);
+        return new RecoveryServerHandshakeManager(launchId, consistentId, 
FACTORY, descriptorProvider, staleIdDetector, this);
     }
 
     /**
@@ -391,22 +391,12 @@ public class ConnectionManager {
     }
 
     /**
-     * Factory producing vanilla {@link RecoveryClientHandshakeManager} 
instances.
+     * Returns collection of all channels of this connection manager.
+     *
+     * @return Collection of all channels of this connection manager.
      */
-    private static class DefaultRecoveryClientHandshakeManagerFactory 
implements RecoveryClientHandshakeManagerFactory {
-        private final StaleIdDetector staleIdDetector;
-
-        private DefaultRecoveryClientHandshakeManagerFactory(StaleIdDetector 
staleIdDetector) {
-            this.staleIdDetector = staleIdDetector;
-        }
-
-        @Override
-        public RecoveryClientHandshakeManager create(UUID launchId,
-                String consistentId,
-                short connectionId,
-                RecoveryDescriptorProvider recoveryDescriptorProvider
-        ) {
-            return new RecoveryClientHandshakeManager(launchId, consistentId, 
connectionId, recoveryDescriptorProvider, staleIdDetector);
-        }
+    @TestOnly
+    public Map<ConnectorKey<String>, NettySender> channels() {
+        return Map.copyOf(channels);
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
index e8ffa5b042..89a8f3b8ff 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -36,8 +36,8 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
 
     /** {@inheritDoc} */
     @Override
-    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex, boolean inbound) {
-        var key = new ChannelKey(consistentId, launchId, connectionIndex, 
inbound);
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex);
 
         return recoveryDescriptors.computeIfAbsent(key, channelKey -> new 
RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
     }
@@ -56,14 +56,10 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
          */
         private final short connectionId;
 
-        /** {@code true} if channel is inbound, {@code false} otherwise. */
-        private final boolean inbound;
-
-        private ChannelKey(String consistentId, UUID launchId, short 
connectionId, boolean inbound) {
+        private ChannelKey(String consistentId, UUID launchId, short 
connectionId) {
             this.consistentId = consistentId;
             this.launchId = launchId;
             this.connectionId = connectionId;
-            this.inbound = inbound;
         }
 
         /** {@inheritDoc} */
@@ -81,9 +77,6 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
             if (connectionId != that.connectionId) {
                 return false;
             }
-            if (inbound != that.inbound) {
-                return false;
-            }
             if (!consistentId.equals(that.consistentId)) {
                 return false;
             }
@@ -96,7 +89,6 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
             int result = consistentId.hashCode();
             result = 31 * result + launchId.hashCode();
             result = 31 * result + connectionId;
-            result = 31 * result + (inbound ? 1 : 0);
             return result;
         }
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
index 348bde848c..92d50d1749 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
 import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Inbound handler that handles incoming acknowledgement messages and sends 
acknowledgement messages for other messages.
@@ -69,4 +70,11 @@ public class InboundRecoveryHandler extends 
ChannelInboundHandlerAdapter {
 
         super.channelRead(ctx, message);
     }
+
+    @Override
+    public void channelInactive(@NotNull ChannelHandlerContext ctx) throws 
Exception {
+        descriptor.release(ctx);
+
+        super.channelInactive(ctx);
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 2294646d60..f6415cd4c8 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.netty;
 import static 
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.handler.stream.ChunkedInput;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
@@ -94,10 +95,12 @@ public class NettySender {
     }
 
     /**
-     * Closes channel.
+     * Closes channel and returns the {@link Channel#closeFuture()}.
+     *
+     * @return {@link Channel#closeFuture()} of the {@link #channel}.
      */
-    public void close() {
-        this.channel.close().awaitUninterruptibly();
+    public ChannelFuture close() {
+        return this.channel.close();
     }
 
     /**
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 7242c25725..ba2d5ff73f 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -73,9 +73,6 @@ public class NettyServer {
     @Nullable
     private CompletableFuture<Void> serverCloseFuture;
 
-    /** New connections listener. */
-    private final Consumer<NettySender> newConnectionListener;
-
     /** Flag indicating if {@link #stop()} has been called. */
     private boolean stopped;
 
@@ -84,7 +81,6 @@ public class NettyServer {
      *
      * @param configuration         Server configuration.
      * @param handshakeManager      Handshake manager supplier.
-     * @param newConnectionListener New connections listener.
      * @param messageListener       Message listener.
      * @param serializationService  Serialization service.
      * @param bootstrapFactory      Netty bootstrap factory.
@@ -92,14 +88,12 @@ public class NettyServer {
     public NettyServer(
             NetworkView configuration,
             Supplier<HandshakeManager> handshakeManager,
-            Consumer<NettySender> newConnectionListener,
             Consumer<InNetworkObject> messageListener,
             SerializationService serializationService,
             NettyBootstrapFactory bootstrapFactory
     ) {
         this.configuration = configuration;
         this.handshakeManager = handshakeManager;
-        this.newConnectionListener = newConnectionListener;
         this.messageListener = messageListener;
         this.serializationService = serializationService;
         this.bootstrapFactory = bootstrapFactory;
@@ -137,8 +131,6 @@ public class NettyServer {
                             } else {
                                 PipelineUtils.setup(ch.pipeline(), 
sessionSerializationService, manager, messageListener);
                             }
-
-                            
manager.handshakeFuture().thenAccept(newConnectionListener);
                         }
                     });
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
new file mode 100644
index 0000000000..e3bf6018ef
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.UUID;
+
+/**
+ * The HandshakeTieBreaker class provides a mechanism for determining whether 
an existing channel should be closed during a handshake
+ * process.
+ */
+class HandshakeTieBreaker {
+    /**
+     * Determines whether an existing channel should be closed based on the 
comparison of the server's launch id and the client's launch id.
+     * If the client's launch id is greater than the server's launch id, the 
existing channel should be closed in favor of the new one. If
+     * the server's launch id is less than or equal to the client's launch id, 
the existing channel should be closed in favor of the new
+     * one.
+     *
+     * @param serverLaunchId Server's launch id.
+     * @param clientLaunchId Client's launch id.
+     * @return {@code true} if an existing channel should be closed, {@code 
false} otherwise.
+     */
+    static boolean shouldCloseChannel(UUID serverLaunchId, UUID 
clientLaunchId) {
+        return clientLaunchId.compareTo(serverLaunchId) > 0;
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index cefc0a16a1..53a00335a9 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -18,19 +18,21 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
+import static 
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
 import org.apache.ignite.internal.network.netty.NettySender;
@@ -104,13 +106,34 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
             String consistentId,
             short connectionId,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
-            StaleIdDetector staleIdDetector
+            StaleIdDetector staleIdDetector,
+            ChannelCreationListener channelCreationListener
     ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionId = connectionId;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
         this.staleIdDetector = staleIdDetector;
+
+        this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+            if (throwable != null) {
+                releaseResources();
+
+                return;
+            }
+
+            channelCreationListener.handshakeFinished(nettySender);
+        });
+    }
+
+    private void releaseResources() {
+        assert ctx.executor().inEventLoop() : "Release resources called 
outside of event loop";
+
+        RecoveryDescriptor desc = recoveryDescriptor;
+
+        if (desc != null) {
+            desc.release(ctx);
+        }
     }
 
     /** {@inheritDoc} */
@@ -125,25 +148,7 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
     @Override
     public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartMessage) {
-            HandshakeStartMessage msg = (HandshakeStartMessage) message;
-
-            if (staleIdDetector.isIdStale(msg.launchId().toString())) {
-                handleStaleServerId(msg);
-
-                return;
-            }
-
-            this.remoteLaunchId = msg.launchId();
-            this.remoteConsistentId = msg.consistentId();
-
-            this.recoveryDescriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
-                    remoteConsistentId,
-                    remoteLaunchId,
-                    connectionId,
-                    false
-            );
-
-            handshake(recoveryDescriptor);
+            onHandshakeStartMessage((HandshakeStartMessage) message);
 
             return;
         }
@@ -197,8 +202,49 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         ctx.fireChannelRead(message);
     }
 
+    private void onHandshakeStartMessage(HandshakeStartMessage message) {
+        if (staleIdDetector.isIdStale(message.launchId().toString())) {
+            handleStaleServerId(message);
+
+            return;
+        }
+
+        this.remoteLaunchId = message.launchId();
+        this.remoteConsistentId = message.consistentId();
+
+        RecoveryDescriptor descriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
+                remoteConsistentId,
+                remoteLaunchId,
+                connectionId
+        );
+
+        while (!descriptor.acquire(ctx)) {
+            if (shouldCloseChannel(remoteLaunchId, launchId)) {
+                Channel holderChannel = descriptor.holderChannel();
+
+                if (holderChannel == null) {
+                    continue;
+                }
+
+                holderChannel.close().awaitUninterruptibly();
+            } else {
+                String err = "Failed to acquire recovery descriptor during 
handshake, it is held by: " + descriptor.holderDescription();
+
+                LOG.info(err);
+
+                handshakeCompleteFuture.completeExceptionally(new 
ChannelAlreadyExistsException(remoteConsistentId));
+
+                return;
+            }
+        }
+
+        this.recoveryDescriptor = descriptor;
+
+        handshake(this.recoveryDescriptor);
+    }
+
     private void handleStaleServerId(HandshakeStartMessage msg) {
-        String reason = msg.launchId() + " is stale, server should be 
restarted so that clients can connect";
+        String reason = msg.consistentId() + ":" + msg.launchId() + " is 
stale, server should be restarted so that clients can connect";
         HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
                 .reason(reason)
                 .build();
@@ -232,7 +278,7 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
                 .connectionId(connectionId)
                 .build();
 
-        ChannelFuture sendFuture = ctx.channel().writeAndFlush(new 
OutNetworkObject(response, Collections.emptyList(), false));
+        ChannelFuture sendFuture = ctx.channel().writeAndFlush(new 
OutNetworkObject(response, emptyList(), false));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
             if (throwable != null) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
index 64e160183c..463f2ed49c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -17,12 +17,16 @@
 
 package org.apache.ignite.internal.network.recovery;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Recovery protocol descriptor.
@@ -40,6 +44,9 @@ public class RecoveryDescriptor {
     /** Count of received messages. */
     private long receivedCount;
 
+    /** Current owner channel of this descriptor. */
+    private final AtomicReference<Channel> channelHolder = new 
AtomicReference<>();
+
     /**
      * Constructor.
      *
@@ -124,4 +131,43 @@ public class RecoveryDescriptor {
     public String toString() {
         return S.toString(RecoveryDescriptor.class, this);
     }
+
+    /**
+     * Release this descriptor.
+     *
+     * @param ctx Channel handler context.
+     */
+    public void release(ChannelHandlerContext ctx) {
+        channelHolder.compareAndSet(ctx.channel(), null);
+    }
+
+    /**
+     * Acquire this descriptor.
+     *
+     * @param ctx Channel handler context.
+     */
+    public boolean acquire(ChannelHandlerContext ctx) {
+        return channelHolder.compareAndSet(null, ctx.channel());
+    }
+
+    /**
+     * Returns the channel, that holds this descriptor.
+     */
+    @Nullable Channel holderChannel() {
+        return channelHolder.get();
+    }
+
+    /**
+     * Returns {@code toString()} representation of a {@link Channel}, that 
holds this descriptor.
+     */
+    String holderDescription() {
+        Channel channel = channelHolder.get();
+
+        if (channel == null) {
+            // This can happen if channel was already closed and it released 
the descriptor.
+            return "No channel";
+        }
+
+        return channel.toString();
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
index b46a29ba2b..5603e18009 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
@@ -29,8 +29,7 @@ public interface RecoveryDescriptorProvider {
      * @param consistentId Remote node consistent id.
      * @param launchId Remote node launch id.
      * @param connectionIndex Connection id.
-     * @param inbound {@code true} if the connection is inbound, {@code false} 
otherwise.
      * @return Recovery descriptor.
      */
-    RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex, boolean inbound);
+    RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex);
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 395de002b7..234f508dd0 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
+import static 
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.MessageHandler;
 import org.apache.ignite.internal.network.netty.NettySender;
@@ -106,13 +108,34 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
             String consistentId,
             NetworkMessagesFactory messageFactory,
             RecoveryDescriptorProvider recoveryDescriptorProvider,
-            StaleIdDetector staleIdDetector
+            StaleIdDetector staleIdDetector,
+            ChannelCreationListener channelCreationListener
     ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
         this.staleIdDetector = staleIdDetector;
+
+        this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+            if (throwable != null) {
+                releaseResources();
+
+                return;
+            }
+
+            channelCreationListener.handshakeFinished(nettySender);
+        });
+    }
+
+    private void releaseResources() {
+        assert ctx.executor().inEventLoop() : "Release resources called 
outside of event loop";
+
+        RecoveryDescriptor desc = recoveryDescriptor;
+
+        if (desc != null) {
+            desc.release(ctx);
+        }
     }
 
     /** {@inheritDoc} */
@@ -146,26 +169,7 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
     @Override
     public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
-            HandshakeStartResponseMessage msg = 
(HandshakeStartResponseMessage) message;
-
-            if (staleIdDetector.isIdStale(msg.launchId().toString())) {
-                handleStaleClientId(msg);
-
-                return;
-            }
-
-            this.remoteLaunchId = msg.launchId();
-            this.remoteConsistentId = msg.consistentId();
-            this.receivedCount = msg.receivedCount();
-            this.remoteChannelId = msg.connectionId();
-
-            this.recoveryDescriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
-                    remoteConsistentId,
-                    remoteLaunchId,
-                    remoteChannelId,
-                    true);
-
-            handshake(recoveryDescriptor);
+            onHandshakeStartResponseMessage((HandshakeStartResponseMessage) 
message);
 
             return;
         }
@@ -192,8 +196,56 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         ctx.fireChannelRead(message);
     }
 
+    private void onHandshakeStartResponseMessage(HandshakeStartResponseMessage 
message) {
+        UUID remoteLaunchId = message.launchId();
+        String remoteConsistentId = message.consistentId();
+        long remoteReceivedCount = message.receivedCount();
+        short remoteChannelId = message.connectionId();
+
+        if (staleIdDetector.isIdStale(remoteLaunchId.toString())) {
+            handleStaleClientId(message);
+
+            return;
+        }
+
+        this.remoteLaunchId = remoteLaunchId;
+        this.remoteConsistentId = remoteConsistentId;
+        this.receivedCount = remoteReceivedCount;
+        this.remoteChannelId = remoteChannelId;
+
+        RecoveryDescriptor descriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
+                this.remoteConsistentId,
+                this.remoteLaunchId,
+                this.remoteChannelId
+        );
+
+        while (!descriptor.acquire(ctx)) {
+            if (shouldCloseChannel(launchId, remoteLaunchId)) {
+                Channel holderChannel = descriptor.holderChannel();
+
+                if (holderChannel == null) {
+                    continue;
+                }
+
+                holderChannel.close().awaitUninterruptibly();
+            } else {
+                String err = "Failed to acquire recovery descriptor during 
handshake, it is held by: " + descriptor.holderDescription();
+
+                LOG.info(err);
+
+                handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(err));
+
+                return;
+            }
+        }
+
+        this.recoveryDescriptor = descriptor;
+
+        handshake(descriptor);
+    }
+
     private void handleStaleClientId(HandshakeStartResponseMessage msg) {
-        String reason = msg.launchId() + " is stale, client should be 
restarted to be allowed to connect";
+        String reason = msg.consistentId() + ":" + msg.launchId() + " is 
stale, client should be restarted to be allowed to connect";
         HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
                 .reason(reason)
                 .build();
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 7ca5811863..d71b8d641b 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.future.OrderingFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -282,8 +284,23 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         }
 
         OrderingFuture<NettySender> channel = 
connectionManager.channel(consistentId, type, addr);
-        return channel
-                .thenComposeToCompletable(sender -> sender.send(new 
OutNetworkObject(message, descriptors)));
+
+        return channel.handle((sender, throwable) -> {
+            if (throwable != null) {
+                if (throwable instanceof CompletionException && 
throwable.getCause() instanceof ChannelAlreadyExistsException) {
+                    ChannelAlreadyExistsException e = 
(ChannelAlreadyExistsException) throwable.getCause();
+
+                    OrderingFuture<NettySender> channelFut = 
connectionManager.channel(e.consistentId(), type, addr);
+
+                    return channelFut.thenComposeToCompletable(nettySender -> {
+                        return nettySender.send(new OutNetworkObject(message, 
descriptors));
+                    });
+                }
+
+                throw new CompletionException(throwable);
+            }
+            return sender.send(new OutNetworkObject(message, descriptors));
+        }).thenComposeToCompletable(Function.identity());
     }
 
     private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws 
Exception {
@@ -490,4 +507,9 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     public void stopDroppingMessages() {
         dropMessagesPredicate = null;
     }
+
+    @TestOnly
+    public ConnectionManager connectionManager() {
+        return connectionManager;
+    }
 }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index bfad033576..ff9888edea 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -179,10 +179,7 @@ public class NettyServerTest {
         server = new NettyServer(
                 serverCfg.value(),
                 () -> handshakeManager,
-                sender -> {
-                },
-                (message) -> {
-                },
+                (message) -> {},
                 new SerializationService(registry, 
mock(UserObjectSerializationContext.class)),
                 bootstrapFactory
         );
@@ -219,8 +216,8 @@ public class NettyServerTest {
         InOrder order = Mockito.inOrder(handshakeManager);
 
         order.verify(handshakeManager, timeout()).onInit(any());
-        order.verify(handshakeManager, timeout()).handshakeFuture();
         order.verify(handshakeManager, timeout()).onConnectionOpen();
+        order.verify(handshakeManager, timeout()).handshakeFuture();
         order.verify(handshakeManager, timeout()).onMessage(any());
     }
 
@@ -248,7 +245,6 @@ public class NettyServerTest {
                 () -> mock(HandshakeManager.class),
                 null,
                 null,
-                null,
                 bootstrapFactory
         );
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 31eabb3539..a83aa0975c 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializati
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -109,7 +108,7 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
 
         UUID clientLaunchId = UUID.randomUUID();
-        RecoveryDescriptor serverRecoveryDescriptor = 
serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, 
true);
+        RecoveryDescriptor serverRecoveryDescriptor = 
serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID);
         addUnacknowledgedMessages(serverRecoveryDescriptor);
 
         RecoveryClientHandshakeManager clientHandshakeManager = 
createRecoveryClientHandshakeManager("client", clientLaunchId,
@@ -162,7 +161,7 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
 
         UUID serverLaunchId = UUID.randomUUID();
-        RecoveryDescriptor clientRecoveryDescriptor = 
clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, 
false);
+        RecoveryDescriptor clientRecoveryDescriptor = 
clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID);
         addUnacknowledgedMessages(clientRecoveryDescriptor);
 
         RecoveryClientHandshakeManager clientHandshakeManager = 
createRecoveryClientHandshakeManager(clientRecovery);
@@ -214,8 +213,8 @@ public class RecoveryHandshakeTest {
         RecoveryDescriptorProvider node1Recovery = 
createRecoveryDescriptorProvider();
         RecoveryDescriptorProvider node2Recovery = 
createRecoveryDescriptorProvider();
 
-        UUID node1Uuid = UUID.randomUUID();
-        UUID node2Uuid = UUID.randomUUID();
+        UUID node1Uuid = new UUID(100, 200);
+        UUID node2Uuid = new UUID(300, 400);
 
         RecoveryClientHandshakeManager chm1 = 
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
         RecoveryServerHandshakeManager shm1 = 
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
@@ -223,27 +222,35 @@ public class RecoveryHandshakeTest {
         RecoveryClientHandshakeManager chm2 = 
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
         RecoveryServerHandshakeManager shm2 = 
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
 
-        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
-        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
-        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
-        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+        // Channel opened from node1 to node2 - channel 1.
+        // Channel opened from node2 to node1 - channel 2.
 
-        exchangeServerToClient(in1to2, out2to1);
-        exchangeServerToClient(in2to1, out1to2);
+        // Channel 1.
+        EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
 
-        exchangeClientToServer(in1to2, out2to1);
-        exchangeClientToServer(in2to1, out1to2);
+        // Channel 1.
+        EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
 
-        exchangeServerToClient(in1to2, out2to1);
-        exchangeServerToClient(in2to1, out1to2);
+        exchangeServerToClient(channel2Dst, channel2Src);
+        exchangeServerToClient(channel1Dst, channel1Src);
 
-        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
-        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+        exchangeClientToServer(channel2Dst, channel2Src);
+        exchangeClientToServer(channel1Dst, channel1Src);
 
-        assertFalse(out1to2.finish());
-        assertFalse(in1to2.finish());
-        assertFalse(out2to1.finish());
-        assertFalse(in2to1.finish());
+        // 2 -> 1 is alive, while 1 -> 2 closes because of the tie-breaking.
+        exchangeServerToClient(channel2Dst, channel2Src);
+        assertFalse(channel1Src.isOpen());
+        assertFalse(channel1Dst.isOpen());
+
+        assertTrue(channel2Src.isOpen());
+        assertTrue(channel2Dst.isOpen());
+
+        assertFalse(channel1Src.finish());
+        assertFalse(channel2Dst.finish());
+        assertFalse(channel2Src.finish());
+        assertFalse(channel1Dst.finish());
     }
 
     @Test
@@ -534,7 +541,7 @@ public class RecoveryHandshakeTest {
 
     private RecoveryClientHandshakeManager 
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider, StaleIdDetector 
staleIdDetector) {
-        return new RecoveryClientHandshakeManager(launchId, consistentId, 
CONNECTION_ID, provider, staleIdDetector);
+        return new RecoveryClientHandshakeManager(launchId, consistentId, 
CONNECTION_ID, provider, staleIdDetector, channel -> {});
     }
 
     private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -548,7 +555,7 @@ public class RecoveryHandshakeTest {
 
     private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider, StaleIdDetector 
staleIdDetector) {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, 
MESSAGE_FACTORY, provider, staleIdDetector);
+        return new RecoveryServerHandshakeManager(launchId, consistentId, 
MESSAGE_FACTORY, provider, staleIdDetector, channel -> {});
     }
 
     private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 7a2327617a..cb1736b073 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -325,7 +325,8 @@ class DefaultMessagingServiceTest {
                         consistentId,
                         connectionId,
                         recoveryDescriptorProvider,
-                        staleIdDetector
+                        staleIdDetector,
+                        channel -> {}
                 ) {
                     @Override
                     protected void finishHandshake() {
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ab5091934d..a3a707c84a 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -243,7 +243,6 @@ public class LeaseTracker implements PlacementDriver {
                 return completedFuture(lease);
             }
 
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19532 Race 
between meta storage safe time publication and listeners.
             return 
msManager.clusterTime().waitFor(timestamp).thenApply(ignored -> inBusyLock(
                     busyLock, () -> {
                         Lease lease0 = 
leasesMap.getOrDefault(replicationGroupId, EMPTY_LEASE);

Reply via email to