This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f3f30b79086 IGNITE-28209 Throw RecipientLeftException more often in
MessagingService (#7778)
f3f30b79086 is described below
commit f3f30b79086a6f63e178d1f7098321dcbe2a2516
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon Mar 16 12:41:01 2026 +0300
IGNITE-28209 Throw RecipientLeftException more often in MessagingService
(#7778)
---
.../internal/network/DefaultMessagingService.java | 21 +++-
.../network/DefaultMessagingServiceTest.java | 110 ++++++++++++++++++---
2 files changed, 116 insertions(+), 15 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
index 6ac8f8562ca..4794fac012c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/DefaultMessagingService.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.LONG_HANDLING_LOGGING_ENABLED;
import static
org.apache.ignite.internal.metrics.sources.ThreadPoolMetricSource.THREAD_POOLS_METRICS_SOURCE_NAME;
import static
org.apache.ignite.internal.network.NettyBootstrapFactory.isInNetworkThread;
@@ -49,7 +50,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
-import java.util.function.Function;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.future.OrderingFuture;
@@ -324,11 +324,23 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return nullCompletedFuture();
}
+ if (strictIdCheck && staleIdDetector.isIdStale(recipient.id())) {
+ return recipientIsStaleFuture(recipient);
+ }
+
NetworkMessage message = correlationId != null ?
responseFromMessage(msg, correlationId) : msg;
return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck);
}
+ private <U> CompletableFuture<U>
recipientIsStaleFuture(InternalClusterNode recipient) {
+ metrics.incrementMessageRecipientNotFound();
+
+ return failedFuture(
+ new RecipientLeftException("Recipient is stale [name=" +
recipient.name() + ", id=" + recipient.id() + "]")
+ );
+ }
+
private boolean shouldDropMessage(InternalClusterNode recipient,
NetworkMessage msg) {
BiPredicate<String, NetworkMessage> predicate = dropMessagesPredicate;
@@ -375,6 +387,10 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return responseFuture;
}
+ if (strictIdCheck && staleIdDetector.isIdStale(recipient.id())) {
+ return recipientIsStaleFuture(recipient);
+ }
+
InvokeRequest message = requestFromMessage(msg, correlationId);
return sendViaNetwork(recipient.id(), type, recipientAddress, message,
strictIdCheck)
@@ -401,7 +417,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
) {
if (isInNetworkThread()) {
return CompletableFuture.supplyAsync(() -> sendViaNetwork(nodeId,
type, addr, message, strictIdCheck), outboundExecutor)
- .thenCompose(Function.identity());
+ .thenCompose(identity());
}
List<ClassDescriptorMessage> descriptors;
@@ -412,6 +428,7 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return failedFuture(new IgniteException(INTERNAL_ERR, "Failed to
marshal message: " + e.getMessage(), e));
}
+ // TODO IGNITE-28225 Retry channel creation in case of network issues.
OrderingFuture<NettySender> channelFuture =
connectionManager.channel(nodeId, type, addr);
channelFuture.whenComplete((sender, ex) -> maybeLogHandshakeError(ex,
nodeId, type, addr));
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index 66b37398422..e910a7c2557 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -32,13 +32,17 @@ import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assume.assumeThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.lenient;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -75,6 +79,7 @@ import
org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeManager;
import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
@@ -177,15 +182,6 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
- @Test
- void respondingWhenSenderIsNotInTopologyResultsInFailingFuture() throws
Exception {
- try (Services services = createMessagingService(senderNode,
senderNetworkConfig)) {
- CompletableFuture<Void> resultFuture =
services.messagingService.respond("no-such-node", mock(NetworkMessage.class),
123);
-
- assertThat(resultFuture,
willThrow(UnresolvableConsistentIdException.class));
- }
- }
-
@Test
public void sendMessagesTwoChannels() throws Exception {
try (Services senderServices = createMessagingService(senderNode,
senderNetworkConfig);
@@ -520,6 +516,94 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
}
+ /**
+ * Tests a case where an explicit staleness check is expected, and the
recipient gone stale already. In such a case staleness check
+ * should lead to {@link RecipientLeftException}.
+ */
+ @ParameterizedTest
+ @EnumSource(SendByClusterNodeOperation.class)
+ void sendByClusterNodeToStaleNode(SendByClusterNodeOperation operation)
throws Exception {
+ UUID missingNodeId = randomUUID();
+
+ var missingNode = new ClusterNodeImpl(missingNodeId, "missing-node",
new NetworkAddress("127.1.1.1", 2026), null);
+
+ var staleIdDetector = new InMemoryStaleIds();
+ staleIdDetector.markAsStale(missingNodeId);
+
+ try (Services senderServices = createMessagingService(
+ senderNode,
+ senderNetworkConfig,
+ () -> {},
+ messageSerializationRegistry,
+ staleIdDetector
+ )) {
+ assertThat(
+ operation.sendAction.send(senderServices.messagingService,
testMessage("test"), missingNode),
+ willThrow(RecipientLeftException.class)
+ );
+ }
+ }
+
+ /**
+ * Tests a case where an explicit staleness check is not expected, and the
recipient gone stale already. In such a case there should be
+ * no staleness check and we should get a {@link
UnresolvableConsistentIdException}.
+ */
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ void sendByConsistentIdToStaleNode(SendByConsistentCoordinateOperation
operation) throws Exception {
+ assumeThat(operation,
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+ UUID missingNodeId = randomUUID();
+ String missingNodeName = "missing-node";
+
+ var missingNode = new ClusterNodeImpl(missingNodeId, missingNodeName,
new NetworkAddress("127.1.1.1", 2026), null);
+
+ var staleIdDetector = new InMemoryStaleIds();
+ staleIdDetector.markAsStale(missingNodeId);
+
+
when(topologyService.getByConsistentId(missingNodeName)).thenReturn(null);
+
+ try (Services senderServices = createMessagingService(
+ senderNode,
+ senderNetworkConfig,
+ () -> {},
+ messageSerializationRegistry,
+ staleIdDetector
+ )) {
+ assertThat(
+ operation.sendAction.send(senderServices.messagingService,
testMessage("test"), missingNode),
+ willThrow(UnresolvableConsistentIdException.class)
+ );
+
+ verify(topologyService,
atLeastOnce()).getByConsistentId(missingNodeName);
+ }
+ }
+
+ /**
+ * Tests a case where a node does not exist in topology, and we use the
name to access it. This situation should lead to
+ * {@link UnresolvableConsistentIdException}.
+ */
+ @ParameterizedTest
+ @EnumSource(SendByConsistentCoordinateOperation.class)
+ void sendByConsistentIdToMissingNode(SendByConsistentCoordinateOperation
operation) throws Exception {
+ assumeThat(operation,
is(not(SendByConsistentCoordinateOperation.SEND_BY_ADDRESS)));
+
+ String missingNodeName = "missing-node";
+
+ var missingNode = new ClusterNodeImpl(randomUUID(), missingNodeName,
new NetworkAddress("127.1.1.1", 2026), null);
+
+
when(topologyService.getByConsistentId(missingNodeName)).thenReturn(null);
+
+ try (Services senderServices = createMessagingService(senderNode,
senderNetworkConfig)) {
+ assertThat(
+ operation.sendAction.send(senderServices.messagingService,
testMessage("test"), missingNode),
+ willThrow(UnresolvableConsistentIdException.class)
+ );
+
+ verify(topologyService,
atLeastOnce()).getByConsistentId(missingNodeName);
+ }
+ }
+
private ClusterNodeImpl copyWithDifferentId() {
return new ClusterNodeImpl(
randomUUID(),
@@ -587,16 +671,16 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
}
private Services createMessagingService(InternalClusterNode node,
NetworkConfiguration networkConfig, Runnable beforeHandshake) {
- return createMessagingService(node, networkConfig, beforeHandshake,
messageSerializationRegistry);
+ return createMessagingService(node, networkConfig, beforeHandshake,
messageSerializationRegistry, new AllIdsAreFresh());
}
private Services createMessagingService(
InternalClusterNode node,
NetworkConfiguration networkConfig,
Runnable beforeHandshake,
- MessageSerializationRegistry registry
+ MessageSerializationRegistry registry,
+ StaleIdDetector staleIdDetector
) {
- StaleIdDetector staleIdDetector = new AllIdsAreFresh();
ClusterIdSupplier clusterIdSupplier = new
ConstantClusterIdSupplier(clusterId);
ClassDescriptorRegistry classDescriptorRegistry = new
ClassDescriptorRegistry();