This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f3f30b79086 IGNITE-28209 Throw RecipientLeftException more often in 
MessagingService (#7778)
f3f30b79086 is described below

commit f3f30b79086a6f63e178d1f7098321dcbe2a2516
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Mar 16 12:41:01 2026 +0300

    IGNITE-28209 Throw RecipientLeftException more often in MessagingService 
(#7778)
---
 .../internal/network/DefaultMessagingService.java  |  21 +++-
 .../network/DefaultMessagingServiceTest.java       | 110 ++++++++++++++++++---
 2 files changed, 116 insertions(+), 15 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 6ac8f8562ca..4794fac012c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.network;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
 import static 
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
 import static 
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetworkThread;
@@ -49,7 +50,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiPredicate;
-import java.util.function.Function;
 import org.apache.ignite.internal.failure.FailureContext;
 import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.future.OrderingFuture;
@@ -324,11 +324,23 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             return nullCompletedFuture();
         }
 
+        if (strictIdCheck && staleIdDetector.isIdStale(recipient.id())) {
+            return recipientIsStaleFuture(recipient);
+        }
+
         NetworkMessage message = correlationId != null ? 
responseFromMessage(msg, correlationId) : msg;
 
         return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck);
     }
 
+    private <U> CompletableFuture<U> 
recipientIsStaleFuture(InternalClusterNode recipient) {
+        metrics.incrementMessageRecipientNotFound();
+
+        return failedFuture(
+                new RecipientLeftException("Recipient is stale [name=" + 
recipient.name() + ", id=" + recipient.id() + "]")
+        );
+    }
+
     private boolean shouldDropMessage(InternalClusterNode recipient, 
NetworkMessage msg) {
         BiPredicate<String, NetworkMessage> predicate = dropMessagesPredicate;
 
@@ -375,6 +387,10 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             return responseFuture;
         }
 
+        if (strictIdCheck && staleIdDetector.isIdStale(recipient.id())) {
+            return recipientIsStaleFuture(recipient);
+        }
+
         InvokeRequest message = requestFromMessage(msg, correlationId);
 
         return sendViaNetwork(recipient.id(), type, recipientAddress, message, 
strictIdCheck)
@@ -401,7 +417,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     ) {
         if (isInNetworkThread()) {
             return CompletableFuture.supplyAsync(() -> sendViaNetwork(nodeId, 
type, addr, message, strictIdCheck), outboundExecutor)
-                    .thenCompose(Function.identity());
+                    .thenCompose(identity());
         }
 
         List<ClassDescriptorMessage> descriptors;
@@ -412,6 +428,7 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
             return failedFuture(new IgniteException(INTERNAL_ERR, "Failed to 
marshal message: " + e.getMessage(), e));
         }
 
+        // TODO IGNITE-28225 Retry channel creation in case of network issues.
         OrderingFuture<NettySender> channelFuture = 
connectionManager.channel(nodeId, type, addr);
 
         channelFuture.whenComplete((sender, ex) -> maybeLogHandshakeError(ex, 
nodeId, type, addr));
diff --git 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 66b37398422..e910a7c2557 100644
--- 
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++ 
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -32,13 +32,17 @@ import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assume.assumeThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -75,6 +79,7 @@ import 
org.apache.ignite.internal.network.messages.TestMessageTypes;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
 import 
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
 import org.apache.ignite.internal.network.recovery.StaleIdDetector;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
@@ -177,15 +182,6 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         }
     }
 
-    @Test
-    void respondingWhenSenderIsNotInTopologyResultsInFailingFuture() throws 
Exception {
-        try (Services services = createMessagingService(senderNode, 
senderNetworkConfig)) {
-            CompletableFuture<Void> resultFuture = 
services.messagingService.respond("no-such-node", mock(NetworkMessage.class), 
123);
-
-            assertThat(resultFuture, 
willThrow(UnresolvableConsistentIdException.class));
-        }
-    }
-
     @Test
     public void sendMessagesTwoChannels() throws Exception {
         try (Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig);
@@ -520,6 +516,94 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    /**
+     * Tests a case where an explicit staleness check is expected, and the 
recipient gone stale already. In such a case staleness check
+     * should lead to {@link RecipientLeftException}.
+     */
+    @ParameterizedTest
+    @EnumSource(SendByClusterNodeOperation.class)
+    void sendByClusterNodeToStaleNode(SendByClusterNodeOperation operation) 
throws Exception {
+        UUID missingNodeId = randomUUID();
+
+        var missingNode = new ClusterNodeImpl(missingNodeId, "missing-node", 
new NetworkAddress("127.1.1.1", 2026), null);
+
+        var staleIdDetector = new InMemoryStaleIds();
+        staleIdDetector.markAsStale(missingNodeId);
+
+        try (Services senderServices = createMessagingService(
+                senderNode,
+                senderNetworkConfig,
+                () -> {},
+                messageSerializationRegistry,
+                staleIdDetector
+        )) {
+            assertThat(
+                    operation.sendAction.send(senderServices.messagingService, 
testMessage("test"), missingNode),
+                    willThrow(RecipientLeftException.class)
+            );
+        }
+    }
+
+    /**
+     * Tests a case where an explicit staleness check is not expected, and the 
recipient gone stale already. In such a case there should be
+     * no staleness check and we should get a {@link 
UnresolvableConsistentIdException}.
+     */
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    void sendByConsistentIdToStaleNode(SendByConsistentCoordinateOperation 
operation) throws Exception {
+        assumeThat(operation, 
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+        UUID missingNodeId = randomUUID();
+        String missingNodeName = "missing-node";
+
+        var missingNode = new ClusterNodeImpl(missingNodeId, missingNodeName, 
new NetworkAddress("127.1.1.1", 2026), null);
+
+        var staleIdDetector = new InMemoryStaleIds();
+        staleIdDetector.markAsStale(missingNodeId);
+
+        
when(topologyService.getByConsistentId(missingNodeName)).thenReturn(null);
+
+        try (Services senderServices = createMessagingService(
+                senderNode,
+                senderNetworkConfig,
+                () -> {},
+                messageSerializationRegistry,
+                staleIdDetector
+        )) {
+            assertThat(
+                    operation.sendAction.send(senderServices.messagingService, 
testMessage("test"), missingNode),
+                    willThrow(UnresolvableConsistentIdException.class)
+            );
+
+            verify(topologyService, 
atLeastOnce()).getByConsistentId(missingNodeName);
+        }
+    }
+
+    /**
+     * Tests a case where a node does not exist in topology, and we use the 
name to access it. This situation should lead to
+     * {@link UnresolvableConsistentIdException}.
+     */
+    @ParameterizedTest
+    @EnumSource(SendByConsistentCoordinateOperation.class)
+    void sendByConsistentIdToMissingNode(SendByConsistentCoordinateOperation 
operation) throws Exception {
+        assumeThat(operation, 
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+        String missingNodeName = "missing-node";
+
+        var missingNode = new ClusterNodeImpl(randomUUID(), missingNodeName, 
new NetworkAddress("127.1.1.1", 2026), null);
+
+        
when(topologyService.getByConsistentId(missingNodeName)).thenReturn(null);
+
+        try (Services senderServices = createMessagingService(senderNode, 
senderNetworkConfig)) {
+            assertThat(
+                    operation.sendAction.send(senderServices.messagingService, 
testMessage("test"), missingNode),
+                    willThrow(UnresolvableConsistentIdException.class)
+            );
+
+            verify(topologyService, 
atLeastOnce()).getByConsistentId(missingNodeName);
+        }
+    }
+
     private ClusterNodeImpl copyWithDifferentId() {
         return new ClusterNodeImpl(
                 randomUUID(),
@@ -587,16 +671,16 @@ class DefaultMessagingServiceTest extends 
BaseIgniteAbstractTest {
     }
 
     private Services createMessagingService(InternalClusterNode node, 
NetworkConfiguration networkConfig, Runnable beforeHandshake) {
-        return createMessagingService(node, networkConfig, beforeHandshake, 
messageSerializationRegistry);
+        return createMessagingService(node, networkConfig, beforeHandshake, 
messageSerializationRegistry, new AllIdsAreFresh());
     }
 
     private Services createMessagingService(
             InternalClusterNode node,
             NetworkConfiguration networkConfig,
             Runnable beforeHandshake,
-            MessageSerializationRegistry registry
+            MessageSerializationRegistry registry,
+            StaleIdDetector staleIdDetector
     ) {
-        StaleIdDetector staleIdDetector = new AllIdsAreFresh();
         ClusterIdSupplier clusterIdSupplier = new 
ConstantClusterIdSupplier(clusterId);
 
         ClassDescriptorRegistry classDescriptorRegistry = new 
ClassDescriptorRegistry();

Reply via email to