This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 42b18d49fe7 IGNITE-22377 Fail one of nodes if a stale node detected 
(#7248)
42b18d49fe7 is described below

commit 42b18d49fe725b1d0e8b79c841623421b8797308
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Jan 12 10:13:01 2026 +0300

    IGNITE-22377 Fail one of nodes if a stale node detected (#7248)
---
 .../management/topology/ItLogicalTopologyTest.java |  6 +++
 .../internal/network/file/TestTopologyService.java |  9 +++-
 ...des.java => LogicalTopologyEventsListener.java} |  8 ++--
 .../ignite/internal/network/TopologyService.java   |  7 ++-
 modules/network/build.gradle                       |  1 +
 .../node/ItNodeStalenessAndRestartTest.java        | 39 ++++++++++++++++-
 .../internal/network/DefaultMessagingService.java  |  6 ++-
 .../internal/network/netty/ConnectionManager.java  |  3 +-
 .../network/recovery/HandshakeManagerUtils.java    | 29 +++++++++++++
 .../recovery/RecoveryAcceptorHandshakeManager.java | 50 ++++++++++++++++------
 .../RecoveryInitiatorHandshakeManager.java         | 32 ++++++++------
 .../recovery/StaleNodeHandlingParametersImpl.java} | 29 ++++++-------
 .../recovery/message/HandshakeStartMessage.java    |  5 ++-
 .../message/HandshakeStartResponseMessage.java     |  5 ++-
 .../message/StaleNodeHandlingParameters.java}      | 22 +++-------
 .../scalecube/ScaleCubeTopologyService.java        | 14 +++++-
 .../network/netty/RecoveryHandshakeTest.java       |  3 +-
 .../RecoveryAcceptorHandshakeManagerTest.java      |  4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  8 ++--
 .../engine/framework/ClusterServiceFactory.java    |  9 +++-
 20 files changed, 211 insertions(+), 78 deletions(-)

diff --git 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
index d8ce414dbd0..57661230558 100644
--- 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
+++ 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/topology/ItLogicalTopologyTest.java
@@ -47,15 +47,19 @@ import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import 
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
+import 
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
 import org.apache.ignite.internal.tostring.S;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Integration tests for functionality of logical topology events subscription.
  */
 @SuppressWarnings("resource")
+@ExtendWith(FailureManagerExtension.class)
 class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
     private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
 
@@ -254,6 +258,7 @@ class ItLogicalTopologyTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
+    @MuteFailureManagerLogging
     void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws 
Exception {
         cluster.startAndInit(1);
 
@@ -318,6 +323,7 @@ class ItLogicalTopologyTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
+    @MuteFailureManagerLogging
     void nodeThatCouldNotJoinShouldBeInvalidated(TestInfo testInfo) throws 
Exception {
         cluster.startAndInit(1);
 
diff --git 
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
 
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
index 6479eab06fa..d1275b92cfc 100644
--- 
a/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
+++ 
b/modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/TestTopologyService.java
@@ -44,6 +44,11 @@ public class TestTopologyService extends 
AbstractTopologyService {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public long logicalTopologyVersion() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public @Nullable InternalClusterNode getByAddress(NetworkAddress addr) {
         throw new UnsupportedOperationException();
@@ -78,10 +83,10 @@ public class TestTopologyService extends 
AbstractTopologyService {
     }
 
     @Override
-    public void onJoined(InternalClusterNode node) {
+    public void onJoined(InternalClusterNode node, long topologyVersion) {
     }
 
     @Override
-    public void onLeft(InternalClusterNode node) {
+    public void onLeft(InternalClusterNode node, long topologyVersion) {
     }
 }
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
similarity index 79%
copy from 
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
copy to 
modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
index d31c31f5b61..d223a94f88b 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/LogicalTopologyEventsListener.java
@@ -20,19 +20,21 @@ package org.apache.ignite.internal.network;
 /**
  * Allows reacting to logical topology changes.
  */
-public interface JoinedNodes {
+public interface LogicalTopologyEventsListener {
 
     /**
      * Called when the node joins logical topology.
      *
      * @param node Node.
+     * @param topologyVersion Logical topology version.
      */
-    void onJoined(InternalClusterNode node);
+    void onJoined(InternalClusterNode node, long topologyVersion);
 
     /**
      * Called when the node leaves logical topology.
      *
      * @param node Node.
+     * @param topologyVersion Logical topology version.
      */
-    void onLeft(InternalClusterNode node);
+    void onLeft(InternalClusterNode node, long topologyVersion);
 }
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
index 2bcc05fcadc..4896787d5d9 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
+++ 
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/TopologyService.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.Nullable;
  * Entry point for obtaining physical cluster topology information.
  */
 // TODO: allow removing event handlers, see 
https://issues.apache.org/jira/browse/IGNITE-14519
-public interface TopologyService extends ClusterNodeResolver, JoinedNodes {
+public interface TopologyService extends ClusterNodeResolver, 
LogicalTopologyEventsListener {
     /**
      * Returns information about the current node.
      *
@@ -47,6 +47,11 @@ public interface TopologyService extends 
ClusterNodeResolver, JoinedNodes {
      */
     Collection<InternalClusterNode> logicalTopologyMembers();
 
+    /**
+     * Returns the logical topology version.
+     */
+    long logicalTopologyVersion();
+
     /**
      * Registers a handler for physical topology change events.
      *
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index 2945aff22a4..310b1303cfe 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -86,6 +86,7 @@ dependencies {
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
     integrationTestImplementation 
testFixtures(project(':ignite-failure-handler:'))
+    integrationTestImplementation libs.awaitility
     integrationTestImplementation libs.compileTesting
     integrationTestImplementation libs.netty.handler
     integrationTestImplementation libs.scalecube.cluster
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
index a7ef269561f..064737bf271 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/node/ItNodeStalenessAndRestartTest.java
@@ -21,17 +21,29 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.ConfigTemplates.FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.ConfigOverride;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.failure.FailureManager;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.TopologyEventHandler;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import 
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
+import 
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector;
+import org.apache.logging.log4j.Level;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+@ExtendWith(FailureManagerExtension.class)
 class ItNodeStalenessAndRestartTest extends ClusterPerTestIntegrationTest {
+
+    private static final String FAILURE_MESSAGE = "Cluster segmentation 
detected, current node will be shut down";
+
     @Override
     protected int initialNodes() {
         return 2;
@@ -43,6 +55,7 @@ class ItNodeStalenessAndRestartTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
+    @MuteFailureManagerLogging
     void nodeStalenessStatusIsClearedOnRestart() throws Exception {
         IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
 
@@ -59,13 +72,37 @@ class ItNodeStalenessAndRestartTest extends 
ClusterPerTestIntegrationTest {
         );
     }
 
+    @Test
+    @ConfigOverride(name = "ignite.failureHandler.handler.type", value = 
"stop")
+    @MuteFailureManagerLogging
+    void staleNodeIsShutDown() throws Exception {
+        IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
+
+        LogInspector logInspector = new LogInspector(
+                FailureManager.class.getName(),
+                evt -> evt.getLevel() == Level.ERROR
+                        && evt.getThrown() != null
+                        && 
evt.getThrown().getMessage().contains(FAILURE_MESSAGE)
+                        && 
Thread.currentThread().getName().contains(cluster.nodeName(1))
+        );
+
+        logInspector.start();
+        try {
+            simulateNetworkPartition(ignite0);
+
+            await().timeout(10, SECONDS).until(logInspector::isMatched);
+        } finally {
+            logInspector.stop();
+        }
+    }
+
     private void simulateNetworkPartition(IgniteImpl ignite0) throws 
InterruptedException {
         CountDownLatch ignite1Left = new CountDownLatch(1);
 
         ignite0.clusterService().topologyService().addEventHandler(new 
TopologyEventHandler() {
             @Override
             public void onDisappeared(InternalClusterNode member) {
-                if (member.name().equals(cluster.node(1).name())) {
+                if (member.name().equals(cluster.nodeName(1))) {
                     ignite1Left.countDown();
                 }
             }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index a41f053db78..f6e0926f11a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -415,7 +415,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                         "Handshake failed [destNodeId={}, channelType={}, 
destAddr={}, localBindAddr={}]", ex,
                         nodeId, type, addr, 
connectionManager.localBindAddress()
                 );
-            } else if (!hasCause(ex, NodeStoppingException.class) && 
LOG.isInfoEnabled()) {
+            } else if (!ignorableHandshakeException(ex) && 
LOG.isInfoEnabled()) {
                 // TODO IGNITE-25802 Detect a LOOP rejection reason and retry 
the connection.
                 LOG.info(
                         "Handshake failed [message={}, destNodeId={}, 
channelType={}, destAddr={}, localBindAddr={}]",
@@ -425,6 +425,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
         }
     }
 
+    private static boolean ignorableHandshakeException(Throwable ex) {
+        return hasCause(ex, NodeStoppingException.class, 
RecipientLeftException.class);
+    }
+
     private void triggerChannelCreation(UUID nodeId, ChannelType type, 
InetSocketAddress addr) {
         connectionManager.channel(nodeId, type, addr);
     }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 55ce2153bdd..b6eb5b34113 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -537,7 +537,8 @@ public class ConnectionManager implements 
ChannelCreationListener {
                 this,
                 stopping::get,
                 productVersionSource,
-                topologyService
+                topologyService,
+                failureProcessor
         );
     }
 
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
index 842ecc5454a..b4fc5b674ce 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeManagerUtils.java
@@ -23,6 +23,9 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureContext;
+import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.failure.FailureType;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
@@ -35,6 +38,8 @@ import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import 
org.apache.ignite.internal.network.recovery.message.StaleNodeHandlingParameters;
+import org.apache.ignite.internal.tostring.S;
 
 class HandshakeManagerUtils {
     private static final IgniteLogger LOG = 
Loggers.forClass(HandshakeManagerUtils.class);
@@ -88,4 +93,28 @@ class HandshakeManagerUtils {
                 ? new RecipientLeftException(msg.message())
                 : new HandshakeException(msg.message());
     }
+
+    static void maybeFailOnStaleNodeDetection(
+            FailureProcessor failureProcessor,
+            StaleNodeHandlingParameters local,
+            StaleNodeHandlingParameters remote,
+            ClusterNodeMessage remoteNode
+    ) {
+        long localTopologyVersion = local.topologyVersion();
+        long remoteTopologyVersion = remote.topologyVersion();
+
+        if (localTopologyVersion >= remoteTopologyVersion) {
+            return;
+        }
+
+        String message = S.toString(
+                "Cluster segmentation detected, current node will be shut 
down",
+                "logicalTopologyVersion", localTopologyVersion, false,
+                "remoteLogicalTopologyVersion", remoteTopologyVersion, false,
+                "remoteNodeId", remoteNode.id(), false,
+                "remoteNodeName", remoteNode.name(), false
+        );
+
+        failureProcessor.process(new 
FailureContext(FailureType.CRITICAL_ERROR, null, message));
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
index ceef409062b..d8d0b7e81cb 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import static java.util.Collections.emptyList;
 import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.maybeFailOnStaleNodeDetection;
 import static 
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
 
 import io.netty.channel.Channel;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -54,6 +56,7 @@ import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionRea
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.version.IgniteProductVersionSource;
 
 /**
@@ -107,9 +110,11 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
     private RecoveryDescriptor recoveryDescriptor;
 
     /** Cluster topology service. */
-    @SuppressWarnings("FieldCanBeLocal")
     private final TopologyService topologyService;
 
+    /** Failure processor. */
+    private final FailureProcessor failureProcessor;
+
     /**
      * Constructor.
      *
@@ -130,7 +135,8 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
             ChannelCreationListener channelCreationListener,
             BooleanSupplier stopping,
             IgniteProductVersionSource productVersionSource,
-            TopologyService topologyService
+            TopologyService topologyService,
+            FailureProcessor failureProcessor
     ) {
         this.localNode = localNode;
         this.messageFactory = messageFactory;
@@ -141,6 +147,7 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         this.stopping = stopping;
         this.productVersionSource = productVersionSource;
         this.topologyService = topologyService;
+        this.failureProcessor = failureProcessor;
 
         this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
             if (throwable != null) {
@@ -174,12 +181,19 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
     /** {@inheritDoc} */
     @Override
     public void onConnectionOpen() {
-        HandshakeStartMessage handshakeStartMessage = 
messageFactory.handshakeStartMessage()
-                
.serverNode(HandshakeManagerUtils.clusterNodeToMessage(localNode))
-                .serverClusterId(clusterIdSupplier.clusterId())
-                .productName(productVersionSource.productName())
-                
.productVersion(productVersionSource.productVersion().toString())
-                .build();
+        if (stopping.getAsBoolean()) {
+            sendRejectionMessageAndFailHandshake(
+                    S.toString("The node is stopping", "name", 
localNode.name(), false),
+                    HandshakeRejectionReason.STOPPING,
+                    m -> new NodeStoppingException()
+            );
+        } else {
+            sendHandshakeStartMessage();
+        }
+    }
+
+    private void sendHandshakeStartMessage() {
+        HandshakeStartMessage handshakeStartMessage = 
createHandshakeStartMessage();
 
         ChannelFuture sendFuture = channel.writeAndFlush(new 
OutNetworkObject(handshakeStartMessage, emptyList()));
 
@@ -192,6 +206,16 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         });
     }
 
+    private HandshakeStartMessage createHandshakeStartMessage() {
+        return messageFactory.handshakeStartMessage()
+                
.serverNode(HandshakeManagerUtils.clusterNodeToMessage(localNode))
+                .serverClusterId(clusterIdSupplier.clusterId())
+                .productName(productVersionSource.productName())
+                
.productVersion(productVersionSource.productVersion().toString())
+                .topologyVersion(topologyService.logicalTopologyVersion())
+                .build();
+    }
+
     /** {@inheritDoc} */
     @Override
     public void onMessage(NetworkMessage message) {
@@ -241,14 +265,14 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
     }
 
     private boolean 
possiblyRejectHandshakeStartResponse(HandshakeStartResponseMessage message) {
-        if (staleIdDetector.isIdStale(message.clientNode().id())) {
-            handleStaleInitiatorId(message);
+        if (stopping.getAsBoolean()) {
+            handleRefusalToEstablishConnectionDueToStopping(message);
 
             return true;
         }
 
-        if (stopping.getAsBoolean()) {
-            handleRefusalToEstablishConnectionDueToStopping(message);
+        if (staleIdDetector.isIdStale(message.clientNode().id())) {
+            handleStaleInitiatorId(message);
 
             return true;
         }
@@ -262,6 +286,8 @@ public class RecoveryAcceptorHandshakeManager implements 
HandshakeManager {
         );
 
         sendRejectionMessageAndFailHandshake(message, 
HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
+
+        maybeFailOnStaleNodeDetection(failureProcessor, new 
StaleNodeHandlingParametersImpl(topologyService), msg, msg.clientNode());
     }
 
     private void 
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage 
msg) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index db34a4ead3a..c42d901c6f7 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -22,6 +22,7 @@ import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
 import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.clusterNodeToMessage;
+import static 
org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.maybeFailOnStaleNodeDetection;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -117,11 +118,9 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
     private RecoveryDescriptor recoveryDescriptor;
 
     /** Cluster topology service. */
-    @SuppressWarnings("FieldCanBeLocal")
     private final TopologyService topologyService;
 
     /** Failure processor. */
-    @SuppressWarnings("FieldCanBeLocal")
     private final FailureProcessor failureProcessor;
 
     /**
@@ -324,12 +323,6 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
             return true;
         }
 
-        if (staleIdDetector.isIdStale(message.serverNode().id())) {
-            handleStaleAcceptorId(message);
-
-            return true;
-        }
-
         if (clusterIdMismatch(message.serverClusterId(), 
clusterIdSupplier.clusterId())) {
             handleClusterIdMismatch(message);
 
@@ -354,6 +347,12 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
             return true;
         }
 
+        if (staleIdDetector.isIdStale(message.serverNode().id())) {
+            handleStaleAcceptorId(message);
+
+            return true;
+        }
+
         return false;
     }
 
@@ -389,6 +388,8 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
                 HandshakeRejectionReason.STALE_LAUNCH_ID,
                 unused -> new RecipientLeftException("Recipient is stale: " + 
msg.serverNode().id())
         );
+
+        maybeFailOnStaleNodeDetection(failureProcessor, new 
StaleNodeHandlingParametersImpl(topologyService), msg, msg.serverNode());
     }
 
     private void handleClusterIdMismatch(HandshakeStartMessage msg) {
@@ -491,11 +492,7 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
     private void handshake(RecoveryDescriptor descriptor) {
         PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, 
createMessageHandler(), MESSAGE_FACTORY);
 
-        HandshakeStartResponseMessage response = 
MESSAGE_FACTORY.handshakeStartResponseMessage()
-                .clientNode(clusterNodeToMessage(localNode))
-                .receivedCount(descriptor.receivedCount())
-                .connectionId(connectionId)
-                .build();
+        HandshakeStartResponseMessage response = 
createHandshakeStartResponseMessage(descriptor);
 
         ChannelFuture sendFuture = ctx.channel().writeAndFlush(new 
OutNetworkObject(response, emptyList()));
 
@@ -508,6 +505,15 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
         });
     }
 
+    protected HandshakeStartResponseMessage 
createHandshakeStartResponseMessage(RecoveryDescriptor descriptor) {
+        return MESSAGE_FACTORY.handshakeStartResponseMessage()
+                .clientNode(clusterNodeToMessage(localNode))
+                .receivedCount(descriptor.receivedCount())
+                .connectionId(connectionId)
+                .topologyVersion(topologyService.logicalTopologyVersion())
+                .build();
+    }
+
     /**
      * Creates a message handler using the consistent id of a remote node.
      *
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
similarity index 54%
copy from 
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
index d31c31f5b61..d3710357415 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleNodeHandlingParametersImpl.java
@@ -15,24 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network;
+package org.apache.ignite.internal.network.recovery;
+
+import org.apache.ignite.internal.network.TopologyService;
+import 
org.apache.ignite.internal.network.recovery.message.StaleNodeHandlingParameters;
 
 /**
- * Allows reacting to logical topology changes.
+ * Basic implementation of {@link StaleNodeHandlingParameters}. Exists only to 
simplify the code and avoid code duplication.
  */
-public interface JoinedNodes {
+class StaleNodeHandlingParametersImpl implements StaleNodeHandlingParameters {
+    private final TopologyService topologyService;
 
-    /**
-     * Called when the node joins logical topology.
-     *
-     * @param node Node.
-     */
-    void onJoined(InternalClusterNode node);
+    StaleNodeHandlingParametersImpl(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
 
-    /**
-     * Called when the node leaves logical topology.
-     *
-     * @param node Node.
-     */
-    void onLeft(InternalClusterNode node);
+    @Override
+    public long topologyVersion() {
+        return topologyService.logicalTopologyVersion();
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
index 2bd7f956bc3..722c0b80b22 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
@@ -28,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  * This message is sent from an acceptor to an initiator at the connection 
opening.
  */
 @Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
+public interface HandshakeStartMessage extends StaleNodeHandlingParameters, 
InternalMessage {
     /** Returns the acceptor node that sends this. */
     ClusterNodeMessage serverNode();
 
@@ -41,4 +41,7 @@ public interface HandshakeStartMessage extends 
InternalMessage {
 
     /** Product version of the node that sends the message. */
     String productVersion();
+
+    @Override
+    long topologyVersion();
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
index 7b6761f213a..83e9cd793ae 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
@@ -26,7 +26,7 @@ import 
org.apache.ignite.internal.network.message.ClusterNodeMessage;
  * This message is sent from an initiator to an acceptor as a response to the 
{@link HandshakeStartMessage}.
  */
 @Transferable(NetworkMessageTypes.HANDSHAKE_START_RESPONSE)
-public interface HandshakeStartResponseMessage extends InternalMessage {
+public interface HandshakeStartResponseMessage extends 
StaleNodeHandlingParameters, InternalMessage {
     /** Returns the initiator node that sends this. */
     ClusterNodeMessage clientNode();
 
@@ -43,4 +43,7 @@ public interface HandshakeStartResponseMessage extends 
InternalMessage {
      * @return Number of received messages.
      */
     long receivedCount();
+
+    @Override
+    long topologyVersion();
 }
diff --git 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
similarity index 66%
rename from 
modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
rename to 
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
index d31c31f5b61..a8409f3d6b8 100644
--- 
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/JoinedNodes.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/StaleNodeHandlingParameters.java
@@ -15,24 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network;
+package org.apache.ignite.internal.network.recovery.message;
 
 /**
- * Allows reacting to logical topology changes.
+ * Parameters required for handling stale state of a node.
  */
-public interface JoinedNodes {
-
-    /**
-     * Called when the node joins logical topology.
-     *
-     * @param node Node.
-     */
-    void onJoined(InternalClusterNode node);
-
-    /**
-     * Called when the node leaves logical topology.
-     *
-     * @param node Node.
-     */
-    void onLeft(InternalClusterNode node);
+public interface StaleNodeHandlingParameters {
+    /** Returns the logical topology version. */
+    long topologyVersion();
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
index e79a818c8c2..f9949236a6c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeTopologyService.java
@@ -59,6 +59,8 @@ final class ScaleCubeTopologyService extends 
AbstractTopologyService {
      */
     private volatile Cluster cluster;
 
+    private volatile long topologyVersion;
+
     /** Topology members from the network address to the cluster node.. */
     private final ConcurrentMap<NetworkAddress, InternalClusterNode> members = 
new ConcurrentHashMap<>();
 
@@ -71,6 +73,11 @@ final class ScaleCubeTopologyService extends 
AbstractTopologyService {
     /** Topology members map from the id to the cluster node. */
     private final ConcurrentMap<UUID, InternalClusterNode> idToMemberMap = new 
ConcurrentHashMap<>();
 
+    @Override
+    public long logicalTopologyVersion() {
+        return topologyVersion;
+    }
+
     /**
      * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency 
injection.
      *
@@ -339,18 +346,21 @@ final class ScaleCubeTopologyService extends 
AbstractTopologyService {
     }
 
     @Override
-    public void onJoined(InternalClusterNode node) {
+    public void onJoined(InternalClusterNode node, long topologyVersion) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Node joined logical topology [node={}]", node);
         }
         membersByConsistentIdInLogicalTopology.put(node.name(), node);
+        this.topologyVersion = topologyVersion;
     }
 
     @Override
-    public void onLeft(InternalClusterNode node) {
+    public void onLeft(InternalClusterNode node, long topologyVersion) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Node left logical topology [node={}]", node);
         }
+
         membersByConsistentIdInLogicalTopology.remove(node.name());
+        this.topologyVersion = topologyVersion;
     }
 }
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index c5f85e2233a..07d1771573c 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -810,7 +810,8 @@ public class RecoveryHandshakeTest extends 
BaseIgniteAbstractTest {
                 channel -> {},
                 () -> false,
                 new DefaultIgniteProductVersionSource(),
-                topologyService
+                topologyService,
+                new NoOpFailureManager()
         );
     }
 
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
index 80187692436..1424af9a75f 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryAcceptorHandshakeManagerTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
@@ -180,7 +181,8 @@ class RecoveryAcceptorHandshakeManagerTest extends 
HandshakeManagerTest {
                 channelCreationListener,
                 stopping,
                 new DefaultIgniteProductVersionSource(),
-                topologyService
+                topologyService,
+                new NoOpFailureManager()
         );
 
         manager.onInit(context);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 122ce290a7a..01882dc23d3 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -178,7 +178,7 @@ import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.DefaultMessagingService;
 import org.apache.ignite.internal.network.IgniteClusterImpl;
 import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.JoinedNodes;
+import org.apache.ignite.internal.network.LogicalTopologyEventsListener;
 import org.apache.ignite.internal.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.NettyBootstrapFactory;
@@ -1347,16 +1347,16 @@ public class IgniteImpl implements Ignite {
         );
     }
 
-    private static LogicalTopologyEventListener 
logicalTopologyJoinedNodesListener(JoinedNodes joinedNodes) {
+    private static LogicalTopologyEventListener 
logicalTopologyJoinedNodesListener(LogicalTopologyEventsListener listener) {
         return new LogicalTopologyEventListener() {
             @Override
             public void onNodeJoined(LogicalNode joinedNode, 
LogicalTopologySnapshot newTopology) {
-                joinedNodes.onJoined(joinedNode);
+                listener.onJoined(joinedNode, newTopology.version());
             }
 
             @Override
             public void onNodeLeft(LogicalNode leftNode, 
LogicalTopologySnapshot newTopology) {
-                joinedNodes.onLeft(leftNode);
+                listener.onLeft(leftNode, newTopology.version());
             }
         };
     }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index 88fee61fa5f..d989ce29c7c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -176,6 +176,11 @@ public class ClusterServiceFactory {
             return Commons.cast(logicalTopology.getLogicalTopology().nodes());
         }
 
+        @Override
+        public long logicalTopologyVersion() {
+            return logicalTopology.getLogicalTopology().version();
+        }
+
         /** {@inheritDoc} */
         @Override
         public @Nullable InternalClusterNode getByAddress(NetworkAddress addr) 
{
@@ -197,11 +202,11 @@ public class ClusterServiceFactory {
         }
 
         @Override
-        public void onJoined(InternalClusterNode node) {
+        public void onJoined(InternalClusterNode node, long topologyVersion) {
         }
 
         @Override
-        public void onLeft(InternalClusterNode node) {
+        public void onLeft(InternalClusterNode node, long topologyVersion) {
         }
     }
 


Reply via email to