This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 94e1adec5f5 IGNITE-26811 Do not throw HandshakeException when node is 
stopping (#7116)
94e1adec5f5 is described below

commit 94e1adec5f568ad9dd2b3369bc80d1ac395e84bb
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 2 11:59:26 2025 +0400

    IGNITE-26811 Do not throw HandshakeException when node is stopping (#7116)
---
 .../matchers/CompletableFutureMatcher.java         |  2 +-
 .../ItBuildIndexWriteIntentsHandlingTest.java      |  2 -
 .../network/scalecube/ItNodeRestartsTest.java      | 85 +++++++++++++++++++++-
 .../internal/network/DefaultMessagingService.java  |  8 +-
 .../network/handshake/NodeStaleException.java      | 28 -------
 .../RecoveryInitiatorHandshakeManager.java         |  8 +-
 .../snapshot/incoming/IncomingSnapshotCopier.java  |  9 +--
 7 files changed, 94 insertions(+), 48 deletions(-)

diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index 4a5dbec0a03..2b76c75c7af 100644
--- 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -82,7 +82,7 @@ public class CompletableFutureMatcher<T> extends 
TypeSafeMatcher<CompletableFutu
     /** {@inheritDoc} */
     @Override
     public void describeTo(Description description) {
-        description.appendText("is ").appendDescriptionOf(matcher);
+        description.appendText("is a future that completes successfully with 
").appendDescriptionOf(matcher);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
index 6eb989d1d62..15815db70be 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
@@ -51,7 +51,6 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 class ItBuildIndexWriteIntentsHandlingTest extends 
ClusterPerTestIntegrationTest {
@@ -83,7 +82,6 @@ class ItBuildIndexWriteIntentsHandlingTest extends 
ClusterPerTestIntegrationTest
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-26811";)
     void 
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
 {
         createTable(1, 1);
 
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
index 2f461a357e5..74a150ab660 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
@@ -17,24 +17,40 @@
 
 package org.apache.ignite.internal.network.scalecube;
 
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toCollection;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.network.ClusterIdSupplier;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.RecipientLeftException;
 import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import org.apache.ignite.network.NetworkAddress;
@@ -73,7 +89,7 @@ class ItNodeRestartsTest {
 
         services = addresses.stream()
                 .map(addr -> startNetwork(testInfo, addr, nodeFinder))
-                .collect(Collectors.toCollection(ArrayList::new)); // ensure 
mutability
+                .collect(toCollection(ArrayList::new)); // ensure mutability
 
         for (ClusterService service : services) {
             assertTrue(waitForTopology(service, 5, 5_000), 
service.topologyService().localMember().toString()
@@ -153,4 +169,69 @@ class ItNodeRestartsTest {
 
         return false;
     }
+
+    /**
+     * Tests that message sends only end with expected exceptions during 
recipient node restart.
+     */
+    @Test
+    public void testRestartDuringSends(TestInfo testInfo) {
+        final int initPort = 3344;
+
+        List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort 
+ 2);
+
+        var nodeFinder = new StaticNodeFinder(addresses);
+
+        services = addresses.stream()
+                .map(addr -> startNetwork(testInfo, addr, nodeFinder))
+                .collect(toCollection(ArrayList::new)); // ensure mutability
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, 2, 5_000), 
service.topologyService().localMember().toString()
+                    + ", topSize=" + 
service.topologyService().allMembers().size());
+        }
+
+        ClusterService sender = services.get(0);
+        ClusterService receiver = services.get(1);
+
+        AtomicBoolean sending = new AtomicBoolean(true);
+
+        CompletableFuture<Void> sendingFuture = runAsync(() -> {
+            InternalClusterNode receiverNode = 
receiver.topologyService().localMember();
+
+            while (sending.get()) {
+                TestMessage message = new 
TestMessagesFactory().testMessage().build();
+                CompletableFuture<Void> future = 
sender.messagingService().send(receiverNode, message);
+
+                try {
+                    future.get(10, SECONDS);
+                } catch (InterruptedException | TimeoutException e) {
+                    throw new RuntimeException(e);
+                } catch (ExecutionException e) {
+                    // TODO: 
https://issues.apache.org/jira/browse/IGNITE-21364 - remove everything except 
RecipientLeftException.
+                    if (!hasCause(e, RecipientLeftException.class, 
ConnectException.class, SocketException.class, IOException.class)) {
+                        if (!hasCause(e, "Channel has been closed before 
handshake has finished", HandshakeException.class)) {
+                            fail("Not an expected exception", e);
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        for (int i = 0; i < 10; i++) {
+            assertThat(services.get(1).stopAsync(new ComponentContext()), 
willCompleteSuccessfully());
+
+            ClusterService restartedReceiver = startNetwork(testInfo, 
addresses.get(1), nodeFinder);
+            services.set(1, restartedReceiver);
+        }
+
+        sending.set(false);
+
+        assertThat(sendingFuture, willCompleteSuccessfully());
+    }
 }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 2aa76f852b4..65bb7eed443 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
-import org.apache.ignite.internal.network.handshake.NodeStaleException;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -416,12 +415,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
                         "Handshake failed [destNodeId={}, channelType={}, 
destAddr={}, localBindAddr={}]", ex,
                         nodeId, type, addr, 
connectionManager.localBindAddress()
                 );
-            } else if (hasCause(ex, NodeStaleException.class) && 
LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Handshake failed [message={}, destNodeId={}, 
channelType={}, destAddr={}, localBindAddr={}]",
-                        ex.getMessage(), nodeId, type, addr, 
connectionManager.localBindAddress()
-                );
-            } else if (!hasCause(ex, NodeStoppingException.class, 
NodeStaleException.class) && LOG.isInfoEnabled()) {
+            } else if (!hasCause(ex, NodeStoppingException.class) && 
LOG.isInfoEnabled()) {
                 // TODO IGNITE-25802 Detect a LOOP rejection reason and retry 
the connection.
                 LOG.info(
                         "Handshake failed [message={}, destNodeId={}, 
channelType={}, destAddr={}, localBindAddr={}]",
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
deleted file mode 100644
index 8200b923045..00000000000
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.handshake;
-
-/** Exception indicating that a handshake error occurred because the remote 
node is became stale. */
-public class NodeStaleException extends HandshakeException {
-    private static final long serialVersionUID = 0L;
-
-    /** Constructor. */
-    public NodeStaleException(String message) {
-        super(message);
-    }
-}
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index c104410ae5d..21c9d93559e 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -41,11 +41,11 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.RecipientLeftException;
 import 
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
 import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.NodeStaleException;
 import org.apache.ignite.internal.network.netty.ChannelCreationListener;
 import org.apache.ignite.internal.network.netty.ChannelKey;
 import org.apache.ignite.internal.network.netty.HandshakeHandler;
@@ -370,7 +370,11 @@ public class RecoveryInitiatorHandshakeManager implements 
HandshakeManager {
                 msg.serverNode().name(), msg.serverNode().id()
         );
 
-        sendRejectionMessageAndFailHandshake(message, 
HandshakeRejectionReason.STALE_LAUNCH_ID, NodeStaleException::new);
+        sendRejectionMessageAndFailHandshake(
+                message,
+                HandshakeRejectionReason.STALE_LAUNCH_ID,
+                unused -> new RecipientLeftException("Recipient is stale: " + 
msg.serverNode().id())
+        );
     }
 
     private void handleClusterIdMismatch(HandshakeStartMessage msg) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f41798a6807..f1657a9d785 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -59,7 +59,7 @@ import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
 import 
org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
 import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.RecipientLeftException;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
 import 
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -245,11 +245,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
             } catch (CancellationException ignored) {
                 // Ignored.
             } catch (ExecutionException e) {
-                if (!hasCause(e, CancellationException.class, 
NodeStoppingException.class)) {
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-26811 
HandshakeException is thrown when node is stopping.
-                    if (!hasCause(e, HandshakeException.class)) {
-                        
partitionSnapshotStorage.failureProcessor().process(new FailureContext(e, 
"Error when completing the copier"));
-                    }
+                if (!hasCause(e, CancellationException.class, 
NodeStoppingException.class, RecipientLeftException.class)) {
+                    partitionSnapshotStorage.failureProcessor().process(new 
FailureContext(e, "Error when completing the copier"));
 
                     if (isOk()) {
                         setError(RaftError.UNKNOWN, "Unknown error on 
completion the copier");

Reply via email to