This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 94e1adec5f5 IGNITE-26811 Do not throw HandshakeException when node is
stopping (#7116)
94e1adec5f5 is described below
commit 94e1adec5f568ad9dd2b3369bc80d1ac395e84bb
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 2 11:59:26 2025 +0400
IGNITE-26811 Do not throw HandshakeException when node is stopping (#7116)
---
.../matchers/CompletableFutureMatcher.java | 2 +-
.../ItBuildIndexWriteIntentsHandlingTest.java | 2 -
.../network/scalecube/ItNodeRestartsTest.java | 85 +++++++++++++++++++++-
.../internal/network/DefaultMessagingService.java | 8 +-
.../network/handshake/NodeStaleException.java | 28 -------
.../RecoveryInitiatorHandshakeManager.java | 8 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 9 +--
7 files changed, 94 insertions(+), 48 deletions(-)
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
index 4a5dbec0a03..2b76c75c7af 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureMatcher.java
@@ -82,7 +82,7 @@ public class CompletableFutureMatcher<T> extends
TypeSafeMatcher<CompletableFutu
/** {@inheritDoc} */
@Override
public void describeTo(Description description) {
- description.appendText("is ").appendDescriptionOf(matcher);
+ description.appendText("is a future that completes successfully with
").appendDescriptionOf(matcher);
}
/** {@inheritDoc} */
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
index 6eb989d1d62..15815db70be 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexWriteIntentsHandlingTest.java
@@ -51,7 +51,6 @@ import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
class ItBuildIndexWriteIntentsHandlingTest extends
ClusterPerTestIntegrationTest {
@@ -83,7 +82,6 @@ class ItBuildIndexWriteIntentsHandlingTest extends
ClusterPerTestIntegrationTest
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-26811")
void
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
{
createTable(1, 1);
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
index 2f461a357e5..74a150ab660 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
@@ -17,24 +17,40 @@
package org.apache.ignite.internal.network.scalecube;
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toCollection;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
+import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NodeFinder;
+import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.messages.TestMessage;
+import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.network.NetworkAddress;
@@ -73,7 +89,7 @@ class ItNodeRestartsTest {
services = addresses.stream()
.map(addr -> startNetwork(testInfo, addr, nodeFinder))
- .collect(Collectors.toCollection(ArrayList::new)); // ensure
mutability
+ .collect(toCollection(ArrayList::new)); // ensure mutability
for (ClusterService service : services) {
assertTrue(waitForTopology(service, 5, 5_000),
service.topologyService().localMember().toString()
@@ -153,4 +169,69 @@ class ItNodeRestartsTest {
return false;
}
+
+ /**
+ * Tests that message sends only end with expected exceptions during
recipient node restart.
+ */
+ @Test
+ public void testRestartDuringSends(TestInfo testInfo) {
+ final int initPort = 3344;
+
+ List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort
+ 2);
+
+ var nodeFinder = new StaticNodeFinder(addresses);
+
+ services = addresses.stream()
+ .map(addr -> startNetwork(testInfo, addr, nodeFinder))
+ .collect(toCollection(ArrayList::new)); // ensure mutability
+
+ for (ClusterService service : services) {
+ assertTrue(waitForTopology(service, 2, 5_000),
service.topologyService().localMember().toString()
+ + ", topSize=" +
service.topologyService().allMembers().size());
+ }
+
+ ClusterService sender = services.get(0);
+ ClusterService receiver = services.get(1);
+
+ AtomicBoolean sending = new AtomicBoolean(true);
+
+ CompletableFuture<Void> sendingFuture = runAsync(() -> {
+ InternalClusterNode receiverNode =
receiver.topologyService().localMember();
+
+ while (sending.get()) {
+ TestMessage message = new
TestMessagesFactory().testMessage().build();
+ CompletableFuture<Void> future =
sender.messagingService().send(receiverNode, message);
+
+ try {
+ future.get(10, SECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-21364 - remove everything except
RecipientLeftException.
+ if (!hasCause(e, RecipientLeftException.class,
ConnectException.class, SocketException.class, IOException.class)) {
+ if (!hasCause(e, "Channel has been closed before
handshake has finished", HandshakeException.class)) {
+ fail("Not an expected exception", e);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ for (int i = 0; i < 10; i++) {
+ assertThat(services.get(1).stopAsync(new ComponentContext()),
willCompleteSuccessfully());
+
+ ClusterService restartedReceiver = startNetwork(testInfo,
addresses.get(1), nodeFinder);
+ services.set(1, restartedReceiver);
+ }
+
+ sending.set(false);
+
+ assertThat(sendingFuture, willCompleteSuccessfully());
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 2aa76f852b4..65bb7eed443 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.handshake.CriticalHandshakeException;
-import org.apache.ignite.internal.network.handshake.NodeStaleException;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -416,12 +415,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
"Handshake failed [destNodeId={}, channelType={},
destAddr={}, localBindAddr={}]", ex,
nodeId, type, addr,
connectionManager.localBindAddress()
);
- } else if (hasCause(ex, NodeStaleException.class) &&
LOG.isDebugEnabled()) {
- LOG.debug(
- "Handshake failed [message={}, destNodeId={},
channelType={}, destAddr={}, localBindAddr={}]",
- ex.getMessage(), nodeId, type, addr,
connectionManager.localBindAddress()
- );
- } else if (!hasCause(ex, NodeStoppingException.class,
NodeStaleException.class) && LOG.isInfoEnabled()) {
+ } else if (!hasCause(ex, NodeStoppingException.class) &&
LOG.isInfoEnabled()) {
// TODO IGNITE-25802 Detect a LOOP rejection reason and retry
the connection.
LOG.info(
"Handshake failed [message={}, destNodeId={},
channelType={}, destAddr={}, localBindAddr={}]",
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
deleted file mode 100644
index 8200b923045..00000000000
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/NodeStaleException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.handshake;
-
-/** Exception indicating that a handshake error occurred because the remote
node is became stale. */
-public class NodeStaleException extends HandshakeException {
- private static final long serialVersionUID = 0L;
-
- /** Constructor. */
- public NodeStaleException(String message) {
- super(message);
- }
-}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
index c104410ae5d..21c9d93559e 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryInitiatorHandshakeManager.java
@@ -41,11 +41,11 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
+import org.apache.ignite.internal.network.RecipientLeftException;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeEventLoopSwitcher;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.NodeStaleException;
import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.ChannelKey;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
@@ -370,7 +370,11 @@ public class RecoveryInitiatorHandshakeManager implements
HandshakeManager {
msg.serverNode().name(), msg.serverNode().id()
);
- sendRejectionMessageAndFailHandshake(message,
HandshakeRejectionReason.STALE_LAUNCH_ID, NodeStaleException::new);
+ sendRejectionMessageAndFailHandshake(
+ message,
+ HandshakeRejectionReason.STALE_LAUNCH_ID,
+ unused -> new RecipientLeftException("Recipient is stale: " +
msg.serverNode().id())
+ );
}
private void handleClusterIdMismatch(HandshakeStartMessage msg) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
index f41798a6807..f1657a9d785 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -59,7 +59,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.message.GetLowWatermarkResponse;
import
org.apache.ignite.internal.lowwatermark.message.LowWatermarkMessagesFactory;
import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.RecipientLeftException;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.raft.PartitionSnapshotMeta;
import
org.apache.ignite.internal.partition.replicator.network.raft.SnapshotMetaResponse;
@@ -245,11 +245,8 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
} catch (CancellationException ignored) {
// Ignored.
} catch (ExecutionException e) {
- if (!hasCause(e, CancellationException.class,
NodeStoppingException.class)) {
- // TODO https://issues.apache.org/jira/browse/IGNITE-26811
HandshakeException is thrown when node is stopping.
- if (!hasCause(e, HandshakeException.class)) {
-
partitionSnapshotStorage.failureProcessor().process(new FailureContext(e,
"Error when completing the copier"));
- }
+ if (!hasCause(e, CancellationException.class,
NodeStoppingException.class, RecipientLeftException.class)) {
+ partitionSnapshotStorage.failureProcessor().process(new
FailureContext(e, "Error when completing the copier"));
if (isOk()) {
setError(RaftError.UNKNOWN, "Unknown error on
completion the copier");