This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d873ebabc6 IGNITE-18712 Do not allow a node excluded from Physical 
Topology to enter the topology again (#1978)
d873ebabc6 is described below

commit d873ebabc67a4b98c7474fc37303432009ebe68a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sun Apr 30 13:51:09 2023 +0400

    IGNITE-18712 Do not allow a node excluded from Physical Topology to enter 
the topology again (#1978)
---
 build.gradle                                       |   5 +
 gradle/libs.versions.toml                          |   6 +-
 .../org/apache/ignite/network/ClusterNode.java     |   5 +-
 .../management/ClusterManagementGroupManager.java  |  30 +----
 .../impl/ItMetaStorageMultipleNodesTest.java       |  33 ------
 modules/network/build.gradle                       |   1 +
 .../network/netty/ItConnectionManagerTest.java     |   4 +-
 .../scalecube/ItScaleCubeNetworkMessagingTest.java | 100 ++++++++++++++++-
 .../internal/network/NetworkMessageTypes.java      |  14 ++-
 .../internal/network/netty/ConnectionManager.java  |  24 +++-
 .../netty/DefaultRecoveryDescriptorProvider.java   |   2 +-
 .../internal/network/netty/HandshakeHandler.java   |   8 +-
 .../internal/network/recovery/FailureHandler.java  |  33 ++++++
 .../network/recovery/InMemoryStaleIds.java         |  39 +++++++
 .../recovery/RecoveryClientHandshakeManager.java   |  56 +++++++++-
 .../recovery/RecoveryServerHandshakeManager.java   |  60 +++++++++-
 .../internal/network/recovery/StaleIdDetector.java |  33 ++++++
 .../ignite/internal/network/recovery/StaleIds.java |  32 ++++++
 .../internal/network/recovery/VaultStateIds.java   | 104 +++++++++++++++++
 .../recovery/message/HandshakeRejectedMessage.java |  38 +++++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  23 +++-
 .../network/netty/RecoveryHandshakeTest.java       |  98 +++++++++++++++-
 .../network/recovery/VaultStateIdsTest.java        | 124 +++++++++++++++++++++
 .../network/DefaultMessagingServiceTest.java       |  25 ++++-
 .../internal/network/recovery/AllIdsAreFresh.java  |  28 +++++
 .../internal/network/recovery/AllIdsAreStale.java  |  28 +++++
 .../TestScaleCubeClusterServiceFactory.java        |   1 -
 .../ignite/utils/ClusterServiceTestUtils.java      |  34 +++++-
 .../raft/ItTruncateSuffixAndRestartTest.java       |   4 +-
 .../internal/compute/ItLogicalTopologyTest.java    |   5 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   4 +-
 ...ItTxDistributedTestThreeNodesThreeReplicas.java |  11 +-
 33 files changed, 907 insertions(+), 109 deletions(-)

diff --git a/build.gradle b/build.gradle
index 30c5f9166f..0b18987366 100644
--- a/build.gradle
+++ b/build.gradle
@@ -119,6 +119,11 @@ subprojects {
                 releasesOnly()
             }
         }
+
+        maven {
+            // TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+            url "https://jitpack.io";
+        }
     }
 
     tasks.register('printSubDependencies', DependencyReportTask)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d4ed85141c..aab1b13919 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -50,7 +50,8 @@ typesafe = "1.4.1"
 hamcrest = "2.2"
 hamcrestOptional = "2.0.0"
 hamcrestPath = "1.0.1"
-scalecube = "2.6.12"
+# TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+scalecube = "2.6.12-patch5"
 calcite = "1.34.0"
 value = "2.8.8"
 janino = "3.1.9"
@@ -194,7 +195,8 @@ spoon-core = { module = "fr.inria.gforge.spoon:spoon-core", 
version.ref = "spoon
 
 fastutil-core = { module = "it.unimi.dsi:fastutil-core", version.ref = 
"fastutil" }
 
-scalecube-cluster = { module = "io.scalecube:scalecube-cluster", version.ref = 
"scalecube" }
+# TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+scalecube-cluster = { module = 
"com.github.rpuch.scalecube-cluster-parent:scalecube-cluster", version.ref = 
"scalecube" }
 
 kryo = { module = "com.esotericsoftware:kryo", version.ref = "kryo" }
 
diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java 
b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index 4d4019132a..dcbe92017f 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -61,10 +61,7 @@ public class ClusterNode implements Serializable {
      * @param address Node address.
      */
     public ClusterNode(String id, String name, NetworkAddress address) {
-        this.id = id;
-        this.name = name;
-        this.address = address;
-        this.nodeMetadata = null;
+        this(id, name, address, null);
     }
 
     /**
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 155b100a17..93a4bf7352 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -138,20 +138,6 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
     /** Node's attributes configuration. */
     private final NodeAttributesConfiguration nodeAttributes;
 
-    /**
-     * Whether we attempted to complete join (i.e. send JoinReady command) on 
Ignite node start.
-     *
-     * <p>Such join completion always happens during a start, and it is always 
the last step during the startup process,
-     * to make sure a node joins the cluster when it's fully ready.
-     *
-     * <p>We need this flag to make sure we handle automatic rejoins 
correctly. If a short network hiccup happens, CMG leader
-     * might lose our node of sight, hence the node will be removed from 
physical and then from logical topologies. When the network
-     * connectivity is restored, the node will appear in the physical 
topology, after which it will try to rejoin the cluster. If such
-     * 'rejoin' was carried out unconditionally, it could happen before the 
first join during startup, so a not-yet-ready node could join
-     * the cluster.
-     */
-    private volatile boolean attemptedCompleteJoinOnStart = false;
-
     /** Constructor. */
     public ClusterManagementGroupManager(
             VaultManager vault,
@@ -522,7 +508,7 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
                             if (service != null && 
service.nodeNames().equals(state.cmgNodes())) {
                                 LOG.info("ClusterStateMessage received, but 
the CMG service is already started");
 
-                                return joinCluster(service, 
state.clusterTag());
+                                return completedFuture(service);
                             }
 
                             if (service == null) {
@@ -550,21 +536,11 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
 
                             return initCmgRaftService(state);
                         })
-                        .thenCompose(Function.identity())
-                        .thenCompose(this::completeJoinIfTryingToRejoin);
+                        .thenCompose(Function.identity());
             }
         }
     }
 
-    private CompletableFuture<CmgRaftService> 
completeJoinIfTryingToRejoin(CmgRaftService cmgRaftService) {
-        if (attemptedCompleteJoinOnStart) {
-            return cmgRaftService.completeJoinCluster(mapNodeAttributes())
-                    .thenApply(unused -> cmgRaftService);
-        } else {
-            return completedFuture(cmgRaftService);
-        }
-    }
-
     private CompletableFuture<CmgRaftService> joinCluster(CmgRaftService 
service, ClusterTag clusterTag) {
         return service.startJoinCluster(clusterTag, mapNodeAttributes())
                 .thenApply(v -> service)
@@ -860,8 +836,6 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
             return failedFuture(new NodeStoppingException());
         }
 
-        attemptedCompleteJoinOnStart = true;
-
         try {
             return raftServiceAfterJoin().thenCompose(svc -> 
svc.completeJoinCluster(mapNodeAttributes()));
         } finally {
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 41c9953828..a094d4f880 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.metastorage.WatchListener;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftNodeId;
@@ -355,38 +354,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
         assertTrue(waitForCondition(() -> 
firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
     }
 
-    /**
-     * Tests a scenario when a node gets kicked out of the Logical Topology 
due to a network partition. It should then be able to join
-     * the Meta Storage Raft group successfully.
-     */
-    @Test
-    void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) 
throws Exception {
-        Node firstNode = startNode(testInfo);
-        Node secondNode = startNode(testInfo);
-
-        firstNode.cmgManager.initCluster(List.of(firstNode.name()), 
List.of(firstNode.name()), "test");
-
-        assertThat(allOf(firstNode.cmgManager.onJoinReady(), 
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
-
-        CompletableFuture<Set<String>> logicalTopologyNodes = 
firstNode.cmgManager
-                .logicalTopology()
-                .thenApply(logicalTopology -> 
logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
-
-        assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), 
secondNode.name())));
-
-        assertTrue(waitForCondition(() -> 
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 
10_000));
-
-        // Make first node lose the second node from the Physical and Logical 
topologies.
-        firstNode.startDroppingMessagesTo(secondNode, ScaleCubeMessage.class);
-
-        assertTrue(waitForCondition(() -> 
firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
-
-        // Make the first node discover the second node again. The second node 
should be added as a Meta Storage Learner again.
-        firstNode.stopDroppingMessages();
-
-        assertTrue(waitForCondition(() -> 
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 
10_000));
-    }
-
     /**
      * Tests that safe time is propagated from the leader to the 
follower/learner.
      */
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index 27f5816aa5..2a1094850b 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -29,6 +29,7 @@ dependencies {
 
     implementation project(':ignite-configuration-api')
     implementation project(':ignite-core')
+    implementation project(':ignite-vault')
     implementation libs.jetbrains.annotations
     implementation libs.scalecube.cluster
     implementation libs.fastutil.core
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index bfd7eace45..9a50a77c68 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -341,7 +342,8 @@ public class ItConnectionManagerTest {
                     new SerializationService(registry, 
mock(UserObjectSerializationContext.class)),
                     launchId,
                     consistentId,
-                    bootstrapFactory
+                    bootstrapFactory,
+                    new AllIdsAreFresh()
             );
 
             manager.start();
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index ec177531b7..d9e86b4417 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.network.scalecube;
 
+import static java.util.stream.Collectors.toUnmodifiableList;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -35,22 +36,30 @@ import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NodeFinder;
@@ -320,6 +329,93 @@ class ItScaleCubeNetworkMessagingTest {
         assertThat(networkMessageFuture, willBe(equalTo(networkMessage)));
     }
 
+    /**
+     * Makes sure that a node that dropped out from the Physical Topology 
cannot reappear with same ID.
+     *
+     * @throws Exception in case of errors.
+     */
+    @SuppressWarnings("ConstantConditions")
+    @Test
+    public void nodeCannotReuseOldId(TestInfo testInfo) throws Exception {
+        testCluster = new Cluster(3, testInfo);
+
+        testCluster.startAwait();
+
+        String outcastName = 
testCluster.members.get(testCluster.members.size() - 1).nodeName();
+
+        knockOutNode(outcastName);
+
+        IgniteBiTuple<CountDownLatch, AtomicBoolean> pair = 
reanimateNode(outcastName);
+        CountDownLatch ready = pair.get1();
+        AtomicBoolean reappeared = pair.get2();
+
+        assertTrue(ready.await(10, TimeUnit.SECONDS), "Node neither 
reappeared, not was rejected");
+
+        assertThat(reappeared.get(), is(false));
+    }
+
+    private void knockOutNode(String outcastName) throws InterruptedException {
+        CountDownLatch disappeared = new CountDownLatch(1);
+
+        testCluster.members.get(0).topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                if (Objects.equals(member.name(), outcastName)) {
+                    disappeared.countDown();
+                }
+            }
+        });
+
+        testCluster.members.stream()
+                .filter(service -> !outcastName.equals(service.nodeName()))
+                .forEach(service -> {
+                    DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
+                    messagingService.dropMessages((recipientConsistentId, 
message) -> outcastName.equals(recipientConsistentId));
+                });
+
+        assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not 
disappear in time");
+    }
+
+    private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String 
outcastName) {
+        CountDownLatch ready = new CountDownLatch(1);
+        AtomicBoolean reappeared = new AtomicBoolean(false);
+
+        testCluster.members.get(0).topologyService().addEventHandler(new 
TopologyEventHandler() {
+            @Override
+            public void onAppeared(ClusterNode member) {
+                if (Objects.equals(member.name(), outcastName)) {
+                    reappeared.compareAndSet(false, true);
+
+                    ready.countDown();
+                }
+            }
+        });
+
+        Handler rejectedHandshakeHandler = new NoOpHandler() {
+            @Override
+            public void publish(LogRecord record) {
+                if (record.getMessage().startsWith("Handshake rejected by ")) {
+                    ready.countDown();
+                }
+            }
+        };
+
+        Logger clientHandshakeManagerLogger = 
Logger.getLogger(RecoveryClientHandshakeManager.class.getName());
+        clientHandshakeManagerLogger.addHandler(rejectedHandshakeHandler);
+
+        Logger serverHandshakeManagerLogger = 
Logger.getLogger(RecoveryServerHandshakeManager.class.getName());
+        serverHandshakeManagerLogger.addHandler(rejectedHandshakeHandler);
+
+        testCluster.members.stream()
+                .filter(service -> !outcastName.equals(service.nodeName()))
+                .forEach(service -> {
+                    DefaultMessagingService messagingService = 
(DefaultMessagingService) service.messagingService();
+                    messagingService.stopDroppingMessages();
+                });
+
+        return new IgniteBiTuple<>(ready, reappeared);
+    }
+
     /**
      * Tests shutdown.
      *
@@ -420,7 +516,7 @@ class ItScaleCubeNetworkMessagingTest {
 
             members = addresses.stream()
                     .map(addr -> startNode(testInfo, addr, 
isInitial.getAndSet(false)))
-                    .collect(Collectors.toUnmodifiableList());
+                    .collect(toUnmodifiableList());
         }
 
         /**
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index 5aba9a2377..2ffd1220c1 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.network.message.InvokeResponse;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.annotations.MessageGroup;
@@ -64,23 +65,28 @@ public class NetworkMessageTypes {
      */
     public static final short HANDSHAKE_FINISH = 5;
 
+    /**
+     * Type for {@link HandshakeRejectedMessage}.
+     */
+    public static final short HANDSHAKE_REJECTED = 6;
+
     /**
      * Type for {@link AcknowledgementMessage}.
      */
-    public static final short ACKNOWLEDGEMENT = 6;
+    public static final short ACKNOWLEDGEMENT = 7;
 
     /**
      * Type for {@link ClassDescriptorMessage}.
      */
-    public static final short CLASS_DESCRIPTOR_MESSAGE = 7;
+    public static final short CLASS_DESCRIPTOR_MESSAGE = 8;
 
     /**
      * Type for {@link FieldDescriptorMessage}.
      */
-    public static final short FIELD_DESCRIPTOR_MESSAGE = 8;
+    public static final short FIELD_DESCRIPTOR_MESSAGE = 9;
 
     /**
      * Type for {@link ClassDescriptorListMessage}.
      */
-    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 9;
+    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 10;
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 66a5ea74b6..371c295e48 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManage
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ChannelType;
@@ -87,6 +88,9 @@ public class ConnectionManager {
     /** Node launch id. As opposed to {@link #consistentId}, this identifier 
changes between restarts. */
     private final UUID launchId;
 
+    /** Used to detect that a peer uses a stale ID. */
+    private final StaleIdDetector staleIdDetector;
+
     /** Factory producing {@link RecoveryClientHandshakeManager} instances. */
     private final RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactory;
 
@@ -110,13 +114,15 @@ public class ConnectionManager {
      * @param launchId                      Launch id of this node.
      * @param consistentId                  Consistent id of this node.
      * @param bootstrapFactory              Bootstrap factory.
+     * @param staleIdDetector               Detects stale member IDs.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             SerializationService serializationService,
             UUID launchId,
             String consistentId,
-            NettyBootstrapFactory bootstrapFactory
+            NettyBootstrapFactory bootstrapFactory,
+            StaleIdDetector staleIdDetector
     ) {
         this(
                 networkConfiguration,
@@ -124,7 +130,8 @@ public class ConnectionManager {
                 launchId,
                 consistentId,
                 bootstrapFactory,
-                new DefaultRecoveryClientHandshakeManagerFactory()
+                staleIdDetector,
+                new 
DefaultRecoveryClientHandshakeManagerFactory(staleIdDetector)
         );
     }
 
@@ -136,6 +143,7 @@ public class ConnectionManager {
      * @param launchId                      Launch id of this node.
      * @param consistentId                  Consistent id of this node.
      * @param bootstrapFactory              Bootstrap factory.
+     * @param staleIdDetector               Detects stale member IDs.
      * @param clientHandshakeManagerFactory Factory for {@link 
RecoveryClientHandshakeManager} instances.
      */
     public ConnectionManager(
@@ -144,11 +152,13 @@ public class ConnectionManager {
             UUID launchId,
             String consistentId,
             NettyBootstrapFactory bootstrapFactory,
+            StaleIdDetector staleIdDetector,
             RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
     ) {
         this.serializationService = serializationService;
         this.launchId = launchId;
         this.consistentId = consistentId;
+        this.staleIdDetector = staleIdDetector;
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
         this.networkConfiguration = networkConfiguration;
 
@@ -343,7 +353,7 @@ public class ConnectionManager {
     }
 
     private HandshakeManager createServerHandshakeManager() {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, 
FACTORY, descriptorProvider);
+        return new RecoveryServerHandshakeManager(launchId, consistentId, 
FACTORY, descriptorProvider, staleIdDetector);
     }
 
     /**
@@ -384,13 +394,19 @@ public class ConnectionManager {
      * Factory producing vanilla {@link RecoveryClientHandshakeManager} 
instances.
      */
     private static class DefaultRecoveryClientHandshakeManagerFactory 
implements RecoveryClientHandshakeManagerFactory {
+        private final StaleIdDetector staleIdDetector;
+
+        private DefaultRecoveryClientHandshakeManagerFactory(StaleIdDetector 
staleIdDetector) {
+            this.staleIdDetector = staleIdDetector;
+        }
+
         @Override
         public RecoveryClientHandshakeManager create(UUID launchId,
                 String consistentId,
                 short connectionId,
                 RecoveryDescriptorProvider recoveryDescriptorProvider
         ) {
-            return new RecoveryClientHandshakeManager(launchId, consistentId, 
connectionId, recoveryDescriptorProvider);
+            return new RecoveryClientHandshakeManager(launchId, consistentId, 
connectionId, recoveryDescriptorProvider, staleIdDetector);
         }
     }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
index d04caf0a72..e8ffa5b042 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -95,7 +95,7 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
         public int hashCode() {
             int result = consistentId.hashCode();
             result = 31 * result + launchId.hashCode();
-            result = 31 * result + (int) connectionId;
+            result = 31 * result + connectionId;
             result = 31 * result + (inbound ? 1 : 0);
             return result;
         }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index b887fc21a5..83b60e4834 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -67,7 +67,13 @@ public class HandshakeHandler extends 
ChannelInboundHandlerAdapter {
     /** {@inheritDoc} */
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
-        manager.onConnectionOpen();
+        try {
+            manager.onConnectionOpen();
+        } catch (Throwable e) {
+            LOG.error("Error in onConnectionOpen()", e);
+
+            throw e;
+        }
 
         manager.handshakeFuture().whenComplete((unused, throwable) -> {
             if (throwable != null) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
new file mode 100644
index 0000000000..fe83e445b8
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * A placeholder 'failure handler' to make it easier to recall where a real 
failure handler should be used.
+ */
+// TODO: IGNITE-16899 Replace with a real FailureHandler
+class FailureHandler {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FailureHandler.class);
+
+    void handleFailure(Throwable t) {
+        LOG.error("Critical failure", t);
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
new file mode 100644
index 0000000000..7cb3aedf44
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link StaleIds} that holds its state in memory.
+ */
+public class InMemoryStaleIds implements StaleIds {
+    private final Set<String> ids = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+
+    @Override
+    public boolean isIdStale(String nodeId) {
+        return ids.contains(nodeId);
+    }
+
+    @Override
+    public void markAsStale(String nodeId) {
+        ids.add(nodeId);
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 8925b19834..cefc0a16a1 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.network.recovery;
 
+import static java.util.Collections.emptyList;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
@@ -24,6 +26,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -33,8 +37,10 @@ import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.netty.PipelineUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
 import org.jetbrains.annotations.TestOnly;
@@ -43,6 +49,8 @@ import org.jetbrains.annotations.TestOnly;
  * Recovery protocol handshake manager for a client.
  */
 public class RecoveryClientHandshakeManager implements HandshakeManager {
+    private static final IgniteLogger LOG = 
Loggers.forClass(RecoveryClientHandshakeManager.class);
+
     /** Message factory. */
     private static final NetworkMessagesFactory MESSAGE_FACTORY = new 
NetworkMessagesFactory();
 
@@ -55,6 +63,9 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
+    /** Used to detect that a peer uses a stale ID. */
+    private final StaleIdDetector staleIdDetector;
+
     /** Connection id. */
     private final short connectionId;
 
@@ -79,6 +90,8 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor. */
     private RecoveryDescriptor recoveryDescriptor;
 
+    private final FailureHandler failureHandler = new FailureHandler();
+
     /**
      * Constructor.
      *
@@ -90,11 +103,14 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
             UUID launchId,
             String consistentId,
             short connectionId,
-            RecoveryDescriptorProvider recoveryDescriptorProvider) {
+            RecoveryDescriptorProvider recoveryDescriptorProvider,
+            StaleIdDetector staleIdDetector
+    ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.connectionId = connectionId;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+        this.staleIdDetector = staleIdDetector;
     }
 
     /** {@inheritDoc} */
@@ -111,6 +127,12 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         if (message instanceof HandshakeStartMessage) {
             HandshakeStartMessage msg = (HandshakeStartMessage) message;
 
+            if (staleIdDetector.isIdStale(msg.launchId().toString())) {
+                handleStaleServerId(msg);
+
+                return;
+            }
+
             this.remoteLaunchId = msg.launchId();
             this.remoteConsistentId = msg.consistentId();
 
@@ -126,6 +148,19 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
             return;
         }
 
+        if (message instanceof HandshakeRejectedMessage) {
+            HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
+
+            LOG.warn("Handshake rejected by server: {}", msg.reason());
+
+            handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.reason()));
+
+            // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
+            failureHandler.handleFailure(new IgniteException("Handshake 
rejected by server: " + msg.reason()));
+
+            return;
+        }
+
         assert recoveryDescriptor != null : "Wrong client handshake flow";
 
         if (message instanceof HandshakeFinishMessage) {
@@ -162,6 +197,25 @@ public class RecoveryClientHandshakeManager implements 
HandshakeManager {
         ctx.fireChannelRead(message);
     }
 
+    private void handleStaleServerId(HandshakeStartMessage msg) {
+        String reason = msg.launchId() + " is stale, server should be 
restarted so that clients can connect";
+        HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
+                .reason(reason)
+                .build();
+
+        ChannelFuture sendFuture = channel.writeAndFlush(new 
OutNetworkObject(rejectionMessage, emptyList(), false));
+
+        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
+            if (throwable != null) {
+                handshakeCompleteFuture.completeExceptionally(
+                        new HandshakeException("Failed to send handshake 
rejected message: " + throwable.getMessage(), throwable)
+                );
+            } else {
+                handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(reason));
+            }
+        });
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<NettySender> handshakeFuture() {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 54501be1d8..395de002b7 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.network.recovery;
 
+import static java.util.Collections.emptyList;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -33,8 +36,10 @@ import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.netty.PipelineUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
 import org.jetbrains.annotations.TestOnly;
@@ -43,6 +48,8 @@ import org.jetbrains.annotations.TestOnly;
  * Recovery protocol handshake manager for a server.
  */
 public class RecoveryServerHandshakeManager implements HandshakeManager {
+    private static final IgniteLogger LOG = 
Loggers.forClass(RecoveryServerHandshakeManager.class);
+
     /** Launch id. */
     private final UUID launchId;
 
@@ -78,9 +85,14 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
     /** Recovery descriptor provider. */
     private final RecoveryDescriptorProvider recoveryDescriptorProvider;
 
+    /** Used to detect that a peer uses a stale ID. */
+    private final StaleIdDetector staleIdDetector;
+
     /** Recovery descriptor. */
     private RecoveryDescriptor recoveryDescriptor;
 
+    private final FailureHandler failureHandler = new FailureHandler();
+
     /**
      * Constructor.
      *
@@ -93,12 +105,14 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
             UUID launchId,
             String consistentId,
             NetworkMessagesFactory messageFactory,
-            RecoveryDescriptorProvider recoveryDescriptorProvider
+            RecoveryDescriptorProvider recoveryDescriptorProvider,
+            StaleIdDetector staleIdDetector
     ) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
         this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+        this.staleIdDetector = staleIdDetector;
     }
 
     /** {@inheritDoc} */
@@ -117,7 +131,7 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
                 .consistentId(consistentId)
                 .build();
 
-        ChannelFuture sendFuture = channel.writeAndFlush(new 
OutNetworkObject(handshakeStartMessage, Collections.emptyList(), false));
+        ChannelFuture sendFuture = channel.writeAndFlush(new 
OutNetworkObject(handshakeStartMessage, emptyList(), false));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
             if (throwable != null) {
@@ -134,6 +148,12 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = 
(HandshakeStartResponseMessage) message;
 
+            if (staleIdDetector.isIdStale(msg.launchId().toString())) {
+                handleStaleClientId(msg);
+
+                return;
+            }
+
             this.remoteLaunchId = msg.launchId();
             this.remoteConsistentId = msg.consistentId();
             this.receivedCount = msg.receivedCount();
@@ -150,6 +170,19 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
             return;
         }
 
+        if (message instanceof HandshakeRejectedMessage) {
+            HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
+
+            LOG.warn("Handshake rejected by client: {}", msg.reason());
+
+            handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.reason()));
+
+            // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
+            failureHandler.handleFailure(new IgniteException("Handshake 
rejected by client: " + msg.reason()));
+
+            return;
+        }
+
         assert recoveryDescriptor != null : "Wrong server handshake flow";
 
         if (recoveryDescriptor.unacknowledgedCount() == 0) {
@@ -159,6 +192,25 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
         ctx.fireChannelRead(message);
     }
 
+    private void handleStaleClientId(HandshakeStartResponseMessage msg) {
+        String reason = msg.launchId() + " is stale, client should be 
restarted to be allowed to connect";
+        HandshakeRejectedMessage rejectionMessage = 
messageFactory.handshakeRejectedMessage()
+                .reason(reason)
+                .build();
+
+        ChannelFuture sendFuture = channel.writeAndFlush(new 
OutNetworkObject(rejectionMessage, emptyList(), false));
+
+        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, 
throwable) -> {
+            if (throwable != null) {
+                handshakeCompleteFuture.completeExceptionally(
+                        new HandshakeException("Failed to send handshake 
rejected message: " + throwable.getMessage(), throwable)
+                );
+            } else {
+                handshakeCompleteFuture.completeExceptionally(new 
HandshakeException(reason));
+            }
+        });
+    }
+
     private void handshake(RecoveryDescriptor descriptor) {
         PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, 
createMessageHandler(), messageFactory);
 
@@ -167,7 +219,7 @@ public class RecoveryServerHandshakeManager implements 
HandshakeManager {
                 .build();
 
         CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
-                channel.write(new OutNetworkObject(response, 
Collections.emptyList(), false))
+                channel.write(new OutNetworkObject(response, emptyList(), 
false))
         );
 
         descriptor.acknowledge(receivedCount);
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
new file mode 100644
index 0000000000..ee04b895ea
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * Allows to detect whether an ID identifying a node on the networking level 
is stale or not.
+ * An ID becomes stale when a node having this ID disappears from the Physical 
Topology.
+ */
+@FunctionalInterface
+public interface StaleIdDetector {
+    /**
+     * Returns {@code true} iff the given ID is stale.
+     *
+     * @param nodeId ID to check.
+     * @return {@code true} iff the given ID is stale.
+     */
+    boolean isIdStale(String nodeId);
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
new file mode 100644
index 0000000000..8fdd4c915c
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * Represents a set of IDs that became stale.
+ *
+ * @see StaleIdDetector
+ */
+public interface StaleIds extends StaleIdDetector {
+    /**
+     * Marks a node ID as stale.
+     *
+     * @param nodeId ID to mark as stale.
+     */
+    void markAsStale(String nodeId);
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
new file mode 100644
index 0000000000..3f75b7b3ac
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * {@link StaleIds} implementating using Vault as a persistent storage.
+ */
+public class VaultStateIds implements StaleIds {
+    private static final ByteArray STALE_IDS_KEY = new 
ByteArray("network.staleIds");
+
+    private static final int DEFAULT_MAX_IDS_TO_REMEMBER = 10_000;
+
+    private final VaultManager vaultManager;
+
+    private final int maxIdsToRemember;
+
+    private Set<String> staleIds;
+
+    public VaultStateIds(VaultManager vaultManager) {
+        this(vaultManager, DEFAULT_MAX_IDS_TO_REMEMBER);
+    }
+
+    public VaultStateIds(VaultManager vaultManager, int maxIdsToRemember) {
+        this.vaultManager = vaultManager;
+        this.maxIdsToRemember = maxIdsToRemember;
+    }
+
+    @Override
+    public synchronized boolean isIdStale(String nodeId) {
+        loadFromVaultIfFirstOperation();
+
+        return staleIds.contains(nodeId);
+    }
+
+    private void loadFromVaultIfFirstOperation() {
+        if (staleIds == null) {
+            staleIds = loadStaleIdsFromVault();
+        }
+    }
+
+    private Set<String> loadStaleIdsFromVault() {
+        VaultEntry entry = vaultManager.get(STALE_IDS_KEY).join();
+
+        if (entry == null) {
+            return new LinkedHashSet<>();
+        }
+
+        String[] idsArray = new String(entry.value(), UTF_8).split("\n");
+
+        Set<String> result = new LinkedHashSet<>();
+
+        Collections.addAll(result, idsArray);
+
+        return result;
+    }
+
+    @Override
+    public synchronized void markAsStale(String nodeId) {
+        loadFromVaultIfFirstOperation();
+
+        staleIds.add(nodeId);
+
+        int idsToRemove = staleIds.size() - maxIdsToRemember;
+
+        Iterator<String> iterator = staleIds.iterator();
+        for (int i = 0; i < idsToRemove; i++) {
+            iterator.next();
+            iterator.remove();
+        }
+
+        saveIdsToVault();
+    }
+
+    private void saveIdsToVault() {
+        String joinedIds = String.join("\n", staleIds);
+
+        vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)).join();
+    }
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
new file mode 100644
index 0000000000..88c5efe3f9
--- /dev/null
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery.message;
+
+import static 
org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_REJECTED;
+
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Handshake rejected message, contains the reason for a rejection.
+ * This message is sent from a server to a client or wise versa.
+ * After this message is received it makes no sense to retry connections with 
same node identity (launch ID must be changed
+ * to make a retry).
+ */
+@Transferable(HANDSHAKE_REJECTED)
+public interface HandshakeRejectedMessage extends InternalMessage {
+    /**
+     * Returns rejection reason.
+     *
+     * @return Reason of the rejection.
+     */
+    String reason();
+}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index c23b656afd..7f0d8a2234 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NetworkView;
 import org.apache.ignite.internal.network.configuration.ScaleCubeView;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.StaleIds;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -47,6 +48,7 @@ import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationC
 import 
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.AbstractClusterService;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -54,6 +56,7 @@ import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.NodeFinderFactory;
 import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
@@ -77,7 +80,8 @@ public class ScaleCubeClusterServiceFactory {
             String consistentId,
             NetworkConfiguration networkConfiguration,
             NettyBootstrapFactory nettyBootstrapFactory,
-            MessageSerializationRegistry serializationRegistry
+            MessageSerializationRegistry serializationRegistry,
+            StaleIds staleIds
     ) {
         var messageFactory = new NetworkMessagesFactory();
 
@@ -114,7 +118,8 @@ public class ScaleCubeClusterServiceFactory {
                         serializationService,
                         launchId,
                         consistentId,
-                        nettyBootstrapFactory
+                        nettyBootstrapFactory,
+                        staleIds
                 );
 
                 connectionMgr.start();
@@ -135,7 +140,11 @@ public class ScaleCubeClusterServiceFactory {
                                 topologyService.onMembershipEvent(event);
                             }
                         })
-                        .config(opts -> 
opts.memberAlias(consistentId).metadataCodec(METADATA_CODEC))
+                        .config(opts -> opts
+                                .memberId(launchId.toString())
+                                .memberAlias(consistentId)
+                                .metadataCodec(METADATA_CODEC)
+                        )
                         .transport(opts -> 
opts.transportFactory(transportConfig -> transport))
                         .membership(opts -> 
opts.seedMembers(parseAddresses(finder.findNodes())));
 
@@ -145,6 +154,13 @@ public class ScaleCubeClusterServiceFactory {
                 topologyService.setCluster(cluster);
                 messagingService.setConnectionManager(connectionMgr);
 
+                topologyService.addEventHandler(new TopologyEventHandler() {
+                    @Override
+                    public void onDisappeared(ClusterNode member) {
+                        staleIds.markAsStale(member.id());
+                    }
+                });
+
                 cluster.startAwait();
 
                 // emit an artificial event as if the local member has joined 
the topology (ScaleCube doesn't do that)
@@ -199,6 +215,7 @@ public class ScaleCubeClusterServiceFactory {
                 cluster.updateMetadata(metadata).subscribe();
                 topologyService.updateLocalMetadata(metadata);
             }
+
         };
     }
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 3bbf9881d8..31eabb3539 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -38,10 +38,13 @@ import 
org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.messages.TestMessage;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
+import org.apache.ignite.internal.network.recovery.AllIdsAreStale;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
 import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -254,7 +257,7 @@ public class RecoveryHandshakeTest {
     }
 
     /**
-     * Tests that message was received exactly once in case if network failure 
during acknowledgement.
+     * Tests that message was received exactly once in case of network failure 
during acknowledgement.
      *
      * @param serverDidntReceiveAck {@code true} if server didn't receive the 
acknowledgement, {@code false} if client didn't receive
      *                              the acknowledgement.
@@ -351,6 +354,77 @@ public class RecoveryHandshakeTest {
         assertFalse(clientSideChannel.finish());
     }
 
+    @Test
+    public void serverFailsHandshakeIfClientIdIsAlreadySeen() throws Exception 
{
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = 
createRecoveryClientHandshakeManager(clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = 
createRecoveryServerHandshakeManager(
+                "server",
+                UUID.randomUUID(),
+                serverRecovery,
+                new AllIdsAreStale()
+        );
+
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompletedExceptionally(serverHandshakeManager);
+        checkHandshakeCompletedExceptionally(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
+    }
+
+    @Test
+    public void clientFailsHandshakeIfServerIdIsAlreadySeen() throws Exception 
{
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = 
createRecoveryClientHandshakeManager(
+                "client",
+                UUID.randomUUID(),
+                clientRecovery,
+                new AllIdsAreStale()
+        );
+        RecoveryServerHandshakeManager serverHandshakeManager = 
createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompletedExceptionally(serverHandshakeManager);
+        checkHandshakeCompletedExceptionally(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+
+        assertFalse(serverSideChannel.finish());
+        assertFalse(clientSideChannel.finish());
+    }
+
     /** Message listener that accepts a specific message only once. */
     private static class MessageListener implements Consumer<InNetworkObject> {
         /** Expected message. */
@@ -396,6 +470,14 @@ public class RecoveryHandshakeTest {
         assertFalse(handshakeFuture.isCancelled());
     }
 
+    private void checkHandshakeCompletedExceptionally(HandshakeManager 
manager) {
+        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
+
+        assertTrue(handshakeFuture.isDone());
+        assertTrue(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
     private void addUnacknowledgedMessages(RecoveryDescriptor 
recoveryDescriptor) {
         TestMessage msg = 
TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
         recoveryDescriptor.add(new OutNetworkObject(msg, 
Collections.emptyList()));
@@ -447,7 +529,12 @@ public class RecoveryHandshakeTest {
 
     private RecoveryClientHandshakeManager 
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider) {
-        return new RecoveryClientHandshakeManager(launchId, consistentId, 
CONNECTION_ID, provider);
+        return createRecoveryClientHandshakeManager(consistentId, launchId, 
provider, new AllIdsAreFresh());
+    }
+
+    private RecoveryClientHandshakeManager 
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider, StaleIdDetector 
staleIdDetector) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, 
CONNECTION_ID, provider, staleIdDetector);
     }
 
     private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -456,7 +543,12 @@ public class RecoveryHandshakeTest {
 
     private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
             RecoveryDescriptorProvider provider) {
-        return new RecoveryServerHandshakeManager(launchId, consistentId, 
MESSAGE_FACTORY, provider);
+        return createRecoveryServerHandshakeManager(consistentId, launchId, 
provider, new AllIdsAreFresh());
+    }
+
+    private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider, StaleIdDetector 
staleIdDetector) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, 
MESSAGE_FACTORY, provider, staleIdDetector);
     }
 
     private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
new file mode 100644
index 0000000000..cd2437a403
--- /dev/null
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class VaultStateIdsTest {
+    @Mock
+    private VaultManager vaultManager;
+
+    private final ByteArray staleIdsKey = new ByteArray("network.staleIds");
+
+    private VaultStateIds staleIds;
+
+    @BeforeEach
+    void createObjectToTest() {
+        staleIds = new VaultStateIds(vaultManager);
+    }
+
+    @Test
+    void consultsVaultWhenCheckingForStaleness() {
+        doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1\nid2\nid3".getBytes(UTF_8))))
+                .when(vaultManager).get(staleIdsKey);
+
+        assertThat(staleIds.isIdStale("id1"), is(true));
+        assertThat(staleIds.isIdStale("id2"), is(true));
+        assertThat(staleIds.isIdStale("id3"), is(true));
+        assertThat(staleIds.isIdStale("id10"), is(false));
+    }
+
+    @Test
+    void cachesVaultStateInMemory() {
+        doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1\nid2\nid3".getBytes(UTF_8))))
+                .when(vaultManager).get(staleIdsKey);
+
+        staleIds.isIdStale("id1");
+        staleIds.isIdStale("id2");
+        staleIds.isIdStale("id3");
+
+        verify(vaultManager, times(1)).get(any());
+    }
+
+    @Test
+    void savesNewStaleIdsToVault() {
+        doReturn(completedFuture(null)).when(vaultManager).get(staleIdsKey);
+        doReturn(completedFuture(null))
+                .when(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
+        doReturn(completedFuture(null))
+                .when(vaultManager).put(staleIdsKey, 
"id2\nid1".getBytes(UTF_8));
+
+        staleIds.markAsStale("id2");
+        staleIds.markAsStale("id1");
+    }
+
+    @Test
+    void respectsMaxIdsLimit() {
+        staleIds = new VaultStateIds(vaultManager, 2);
+
+        doReturn(completedFuture(null)).when(vaultManager).get(staleIdsKey);
+
+        AtomicReference<String> lastSavedIds = new AtomicReference<>();
+
+        doAnswer(invocation -> {
+            byte[] value = invocation.getArgument(1);
+
+            lastSavedIds.set(new String(value, UTF_8));
+
+            return completedFuture(null);
+        }).when(vaultManager).put(eq(staleIdsKey), any());
+
+        staleIds.markAsStale("id3");
+        staleIds.markAsStale("id2");
+        staleIds.markAsStale("id1");
+
+        assertThat(lastSavedIds.get(), is("id2\nid1"));
+    }
+
+    @Test
+    void loadsBeforeDoingFirstSave() {
+        lenient().doReturn(completedFuture(new VaultEntry(staleIdsKey, 
"id1".getBytes(UTF_8))))
+                .when(vaultManager).get(staleIdsKey);
+        
doReturn(completedFuture(null)).when(vaultManager).put(eq(staleIdsKey), any());
+
+        staleIds.markAsStale("id2");
+
+        verify(vaultManager).put(staleIdsKey, "id1\nid2".getBytes(UTF_8));
+    }
+}
diff --git 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 5987dcf41f..c9a89dc01d 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -33,7 +33,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -47,9 +46,11 @@ import 
org.apache.ignite.internal.network.messages.TestMessageSerializationFacto
 import org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
 import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -101,7 +102,7 @@ class DefaultMessagingServiceTest {
     );
 
     @BeforeEach
-    void setUp() throws InterruptedException, ExecutionException {
+    void setUp() {
         
lenient().when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
     }
 
@@ -290,13 +291,17 @@ class DefaultMessagingServiceTest {
         NettyBootstrapFactory bootstrapFactory = new 
NettyBootstrapFactory(networkConfig, eventLoopGroupNamePrefix);
         bootstrapFactory.start();
 
+        StaleIdDetector staleIdDetector = new AllIdsAreFresh();
+
+        UUID launchId = UUID.randomUUID();
         ConnectionManager connectionManager = new ConnectionManager(
                 networkConfig.value(),
                 serializationService,
-                UUID.randomUUID(),
+                launchId,
                 node.name(),
                 bootstrapFactory,
-                clientHandshakeManagerFactoryAdding(beforeHandshake)
+                staleIdDetector,
+                clientHandshakeManagerFactoryAdding(beforeHandshake, 
staleIdDetector)
         );
         connectionManager.start();
 
@@ -305,7 +310,8 @@ class DefaultMessagingServiceTest {
         return new Services(connectionManager, messagingService);
     }
 
-    private static RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+    private static RecoveryClientHandshakeManagerFactory 
clientHandshakeManagerFactoryAdding(Runnable beforeHandshake,
+            StaleIdDetector staleIdDetector) {
         return new RecoveryClientHandshakeManagerFactory() {
             @Override
             public RecoveryClientHandshakeManager create(
@@ -313,10 +319,17 @@ class DefaultMessagingServiceTest {
                     String consistentId,
                     short connectionId,
                     RecoveryDescriptorProvider recoveryDescriptorProvider) {
-                return new RecoveryClientHandshakeManager(launchId, 
consistentId, connectionId, recoveryDescriptorProvider) {
+                return new RecoveryClientHandshakeManager(
+                        launchId,
+                        consistentId,
+                        connectionId,
+                        recoveryDescriptorProvider,
+                        staleIdDetector
+                ) {
                     @Override
                     protected void finishHandshake() {
                         beforeHandshake.run();
+
                         super.finishHandshake();
                     }
                 };
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
new file mode 100644
index 0000000000..fbbe03d201
--- /dev/null
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * {@link StaleIdDetector} that reports all IDs as fresh (i.e. not stale).
+ */
+public class AllIdsAreFresh implements StaleIdDetector {
+    @Override
+    public boolean isIdStale(String nodeId) {
+        return false;
+    }
+}
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
new file mode 100644
index 0000000000..be7d4eb099
--- /dev/null
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * {@link StaleIdDetector} that reports all IDs as stale.
+ */
+public class AllIdsAreStale implements StaleIdDetector {
+    @Override
+    public boolean isIdStale(String nodeId) {
+        return true;
+    }
+}
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
index 57cc3716dc..19d94766df 100644
--- 
a/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
@@ -19,7 +19,6 @@ package org.apache.ignite.network.scalecube;
 
 import io.scalecube.cluster.ClusterConfig;
 import org.apache.ignite.internal.network.configuration.ClusterMembershipView;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
 
 /**
  * Scalecube test factory. Provides fast detection time.
diff --git 
a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
 
b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
index 97e95ca0bc..f298c7c24f 100644
--- 
a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
+++ 
b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -30,6 +30,8 @@ import 
org.apache.ignite.internal.configuration.ConfigurationManager;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.configuration.NodeFinderType;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.network.recovery.StaleIds;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterService;
@@ -75,9 +77,23 @@ public class ClusterServiceTestUtils {
      * @param nodeFinder               Node finder.
      */
     public static ClusterService clusterService(TestInfo testInfo, int port, 
NodeFinder nodeFinder) {
+        return clusterService(testInfo, port, nodeFinder, new 
InMemoryStaleIds());
+    }
+
+    /**
+     * Creates a cluster service and required node configuration manager 
beneath it. Populates node configuration with specified port.
+     * Manages configuration manager lifecycle: on cluster service start 
starts node configuration manager, on cluster service stop - stops
+     * node configuration manager.
+     *
+     * @param testInfo                 Test info.
+     * @param port                     Local port.
+     * @param nodeFinder               Node finder.
+     * @param staleIds                 Used to track stale launch IDs.
+     */
+    public static ClusterService clusterService(TestInfo testInfo, int port, 
NodeFinder nodeFinder, StaleIds staleIds) {
         String nodeName = testNodeName(testInfo, port);
 
-        return clusterService(nodeName, port, nodeFinder);
+        return clusterService(nodeName, port, nodeFinder, staleIds);
     }
 
     /**
@@ -89,6 +105,19 @@ public class ClusterServiceTestUtils {
      * @return Cluster service instance.
      */
     public static ClusterService clusterService(String nodeName, int port, 
NodeFinder nodeFinder) {
+        return clusterService(nodeName, port, nodeFinder, new 
InMemoryStaleIds());
+    }
+
+    /**
+     * Creates a cluster service with predefined name.
+     *
+     * @param nodeName Node name.
+     * @param port Local port.
+     * @param nodeFinder Node finder.
+     * @param staleIds Used to track stale launch IDs.
+     * @return Cluster service instance.
+     */
+    private static ClusterService clusterService(String nodeName, int port, 
NodeFinder nodeFinder, StaleIds staleIds) {
         ConfigurationManager nodeConfigurationMgr = new ConfigurationManager(
                 Collections.singleton(NetworkConfiguration.KEY),
                 Set.of(),
@@ -107,7 +136,8 @@ public class ClusterServiceTestUtils {
                 nodeName,
                 networkConfiguration,
                 bootstrapFactory,
-                serializationRegistry
+                serializationRegistry,
+                staleIds
         );
 
         assert nodeFinder instanceof StaticNodeFinder : "Only StaticNodeFinder 
is supported at the moment";
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index 9eaa98ac06..8626d2e22f 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -172,7 +173,8 @@ public class ItTruncateSuffixAndRestartTest {
                     nodeName,
                     networkConfiguration,
                     nettyBootstrapFactory,
-                    defaultSerializationRegistry()
+                    defaultSerializationRegistry(),
+                    new InMemoryStaleIds()
             );
 
             clusterSvc.start();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index f6cf2b43ab..1cc34ef03e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -237,7 +238,7 @@ class ItLogicalTopologyTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    void nodeReturnedToPhysicalTopologyReturnsToLogicalTopology() throws 
Exception {
+    void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws 
Exception {
         IgniteImpl entryNode = node(0);
 
         IgniteImpl secondIgnite = startNode(1);
@@ -257,7 +258,7 @@ class ItLogicalTopologyTest extends 
ClusterPerTestIntegrationTest {
 
         entryNode.stopDroppingMessages();
 
-        assertTrue(secondIgniteAppeared.await(10, TimeUnit.SECONDS), "Did not 
see second node coming back in time");
+        assertFalse(secondIgniteAppeared.await(3, TimeUnit.SECONDS), "Second 
node returned to logical topology");
     }
 
     private static void makeSecondNodeDisappearForFirstNode(IgniteImpl 
firstIgnite, IgniteImpl secondIgnite) throws InterruptedException {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 9db9950775..c02d867fa3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.raft.Loza;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -283,7 +284,8 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 name,
                 networkConfiguration,
                 nettyBootstrapFactory,
-                defaultSerializationRegistry()
+                defaultSerializationRegistry(),
+                new VaultStateIds(vault)
         );
 
         HybridClock hybridClock = new HybridClockImpl();
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f785d423aa..b91fd843e5 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -85,6 +85,7 @@ import 
org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
 import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import 
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
 import org.apache.ignite.internal.raft.Loza;
 import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -324,7 +325,8 @@ public class IgniteImpl implements Ignite {
                 name,
                 networkConfiguration,
                 nettyBootstrapFactory,
-                serializationRegistry
+                serializationRegistry,
+                new VaultStateIds(vaultMgr)
         );
 
         computeComponent = new ComputeComponentImpl(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 036638970b..71dbcb273a 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -19,6 +19,7 @@ package org.apache.ignite.distributed;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.TestInfo;
@@ -51,9 +52,11 @@ public class ItTxDistributedTestThreeNodesThreeReplicas 
extends ItTxDistributedT
     @Override
     @AfterEach
     public void after() throws Exception {
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
assertPartitionsSame(accounts, 0), 5_000));
-        assertTrue(IgniteTestUtils.waitForCondition(() -> 
assertPartitionsSame(customers, 0), 5_000));
-
-        super.after();
+        try {
+            assertTrue(IgniteTestUtils.waitForCondition(() -> 
assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(5)));
+            assertTrue(IgniteTestUtils.waitForCondition(() -> 
assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(5)));
+        } finally {
+            super.after();
+        }
     }
 }


Reply via email to