This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 45102b5209 IGNITE-21523 Send a probe message to server when channel opens at client side (#3345) 45102b5209 is described below commit 45102b52093d4c8a458c556d1065101d72ad9783 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Tue Mar 5 11:55:48 2024 +0400 IGNITE-21523 Send a probe message to server when channel opens at client side (#3345) --- .../internal/network/NetworkMessageTypes.java | 6 ++++ .../internal/network/netty/HandshakeHandler.java | 15 +++++++++- .../internal/network/netty/MessageHandler.java | 7 ++++- .../recovery/RecoveryClientHandshakeManager.java | 30 +++++++++++++++++-- .../recovery/RecoveryServerHandshakeManager.java | 6 ++++ .../network/recovery/message/ProbeMessage.java | 34 ++++++++++++++++++++++ .../network/netty/RecoveryHandshakeTest.java | 12 ++++++++ .../apache/ignite/raft/jraft/core/ItNodeTest.java | 3 +- .../ignite/internal/replicator/ReplicaManager.java | 15 ++++------ .../incoming/IncomingSnapshotCopierTest.java | 3 -- 10 files changed, 112 insertions(+), 19 deletions(-) diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java index 1d9f3f349f..5ef73fe44b 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessag import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; +import org.apache.ignite.internal.network.recovery.message.ProbeMessage; /** * Message types for the network module. @@ -95,4 +96,9 @@ public class NetworkMessageTypes { * Type for {@link ClusterNodeMessage}. */ public static final short CLUSTER_NODE_MESSAGE = 11; + + /** + * Type for {@link ProbeMessage}. + */ + public static final short PROBE_MESSAGE = 12; } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java index 38c564d64a..1aba0e80d6 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.DecoderException; import java.util.function.Consumer; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -78,7 +79,11 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter { manager.localHandshakeFuture().whenComplete((unused, throwable) -> { if (throwable != null) { - LOG.debug("Error when performing handshake", throwable); + if (unexpectedException(throwable)) { + LOG.error("Error when performing handshake", throwable); + } else { + LOG.debug("Error when performing handshake", throwable); + } ctx.close(); } @@ -87,6 +92,14 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter { ctx.fireChannelActive(); } + private static boolean unexpectedException(Throwable ex) { + return ex instanceof Error + || ex instanceof DecoderException + || ex instanceof NullPointerException + || ex instanceof IllegalArgumentException + || ex instanceof IllegalStateException; + } + /** {@inheritDoc} */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java index 762232eca1..c6c3c3a8dd 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.function.Consumer; import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage; +import org.apache.ignite.internal.network.recovery.message.ProbeMessage; import org.apache.ignite.internal.network.serialization.PerSessionSerializationService; import org.apache.ignite.network.ClusterNode; @@ -66,7 +67,7 @@ public class MessageHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) { NetworkMessage message = (NetworkMessage) msg; - if (message instanceof AcknowledgementMessage) { + if (notPayloadMessage(message)) { return; } @@ -74,4 +75,8 @@ public class MessageHandler extends ChannelInboundHandlerAdapter { new InNetworkObject(message, remoteNode, connectionIndex, serializationService.compositeDescriptorRegistry()) ); } + + private static boolean notPayloadMessage(NetworkMessage message) { + return message instanceof AcknowledgementMessage || message instanceof ProbeMessage; + } } diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java index 51ccd202ed..65ee4373ee 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java @@ -21,12 +21,14 @@ import static java.util.Collections.emptyList; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR; +import static org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture; import static org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.clusterNodeToMessage; import static org.apache.ignite.internal.network.recovery.HandshakeManagerUtils.switchEventLoopIfNeeded; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -50,13 +52,13 @@ import org.apache.ignite.internal.network.netty.ChannelKey; import org.apache.ignite.internal.network.netty.HandshakeHandler; import org.apache.ignite.internal.network.netty.MessageHandler; import org.apache.ignite.internal.network.netty.NettySender; -import org.apache.ignite.internal.network.netty.NettyUtils; import org.apache.ignite.internal.network.netty.PipelineUtils; import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; +import org.apache.ignite.internal.network.recovery.message.ProbeMessage; import org.apache.ignite.network.ClusterNode; import org.jetbrains.annotations.TestOnly; @@ -169,6 +171,30 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { this.handler = (HandshakeHandler) ctx.handler(); } + @Override + public void onConnectionOpen() { + // Sending a probe to make sure we detect a channel that ends up in a strange state upon creation: + // the client sees it as a normally open channel, but the server (at least, Netty) did not even notice that it accepted it. + // This happens if the client tries to connect a server that is stopping its network (and closing its server socket) just + // the same exact moment, but then starts its network (binding to the port again) still staying in the same OS process. + sendProbeToServer(); + } + + private void sendProbeToServer() { + ProbeMessage probe = MESSAGE_FACTORY.probeMessage().build(); + + toCompletableFuture(channel.writeAndFlush(new OutNetworkObject(probe, List.of(), false))).whenComplete((res, ex) -> { + if (ex != null) { + if (ex instanceof IOException) { + // We don't care: the channel will be reopened. + LOG.debug("Could not send a probe message via {}", ex, channel); + } else { + LOG.info("Could not send a probe message via {}", ex, channel); + } + } + }); + } + /** {@inheritDoc} */ @Override public void onMessage(NetworkMessage message) { @@ -400,7 +426,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager { ChannelFuture sendFuture = ctx.channel().writeAndFlush(new OutNetworkObject(response, emptyList(), false)); - NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> { + toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> { if (throwable != null) { localHandshakeCompleteFuture.completeExceptionally( new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable) diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java index c5d110db4e..c1d5abe171 100644 --- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMess import org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason; import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage; import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage; +import org.apache.ignite.internal.network.recovery.message.ProbeMessage; import org.apache.ignite.network.ClusterNode; /** @@ -181,6 +182,11 @@ public class RecoveryServerHandshakeManager implements HandshakeManager { /** {@inheritDoc} */ @Override public void onMessage(NetworkMessage message) { + if (message instanceof ProbeMessage) { + // No action required, just ignore it. + return; + } + if (message instanceof HandshakeRejectedMessage) { onHandshakeRejectedMessage((HandshakeRejectedMessage) message); diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/ProbeMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/ProbeMessage.java new file mode 100644 index 0000000000..bdb1ad7c43 --- /dev/null +++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/ProbeMessage.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.recovery.message; + +import static org.apache.ignite.internal.network.NetworkMessageTypes.PROBE_MESSAGE; + +import org.apache.ignite.internal.network.annotations.Transferable; + +/** + * Sent to make sure the established channel is still alive. + */ +@Transferable(PROBE_MESSAGE) +public interface ProbeMessage extends InternalMessage { + /** + * A dummy field to overcome inability to send 'empty' messages. + * TODO: remove when IGNITE-21667 is fixed. + */ + byte dummy(); +} diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java index c9435b05a1..c0f0d36252 100644 --- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java +++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java @@ -104,6 +104,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { assertTrue(serverSideChannel.isActive()); + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -154,6 +155,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { assertTrue(serverSideChannel.isActive()); + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -214,6 +216,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { assertTrue(serverSideChannel.isActive()); + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -271,6 +274,9 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { setupChannel(channel2Src, chm2, noMessageListener); setupChannel(channel2Dst, shm1, noMessageListener); + exchangeClientToServer(channel2Dst, channel2Src); + exchangeClientToServer(channel1Dst, channel1Src); + exchangeServerToClient(channel2Dst, channel2Src); exchangeServerToClient(channel1Dst, channel1Src); @@ -332,10 +338,12 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { setupChannel(channel2Dst, shm1, noMessageListener); // Channel 2's handshake acquires both locks. + exchangeClientToServer(channel2Dst, channel2Src); exchangeServerToClient(channel2Dst, channel2Src); exchangeClientToServer(channel2Dst, channel2Src); // Now Channel 1's handshake cannot acquire even first lock. + exchangeClientToServer(channel1Dst, channel1Src); exchangeServerToClient(channel1Dst, channel1Src); // 2 -> 1 is alive, while 1 -> 2 closes because it is late. @@ -401,6 +409,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener1); // Normal handshake + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -446,6 +455,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { setupChannel(serverSideChannel, serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2); // Handshake + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -495,6 +505,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { assertTrue(serverSideChannel.isActive()); + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); @@ -534,6 +545,7 @@ public class RecoveryHandshakeTest extends BaseIgniteAbstractTest { assertTrue(serverSideChannel.isActive()); + exchangeClientToServer(serverSideChannel, clientSideChannel); exchangeServerToClient(serverSideChannel, clientSideChannel); exchangeClientToServer(serverSideChannel, clientSideChannel); diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java index 8c93a2753b..f3a6beb37f 100644 --- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java +++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; +import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.JraftGroupEventsListener; import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; @@ -126,7 +127,6 @@ import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper; import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy; import org.apache.ignite.raft.jraft.util.Utils; import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup; -import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -1770,7 +1770,6 @@ public class ItNodeTest extends BaseIgniteAbstractTest { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21457") public void testSetPeer2() throws Exception { List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index f03415149e..e2017f576f 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -234,8 +234,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc return; } - String senderConsistentId = sender.name(); - assert correlationId != null; ReplicaRequest request = (ReplicaRequest) message; @@ -244,9 +242,9 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc // and writes. // But if this is a local call (in the same Ignite instance), we might still be in a thread that does not have those permissions. if (currentThreadCannotDoStorageReadsAndWrites()) { - requestsExecutor.execute(() -> handleReplicaRequest(request, senderConsistentId, correlationId)); + requestsExecutor.execute(() -> handleReplicaRequest(request, sender, correlationId)); } else { - handleReplicaRequest(request, senderConsistentId, correlationId); + handleReplicaRequest(request, sender, correlationId); } } @@ -260,7 +258,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc } } - private void handleReplicaRequest(ReplicaRequest request, String senderConsistentId, @Nullable Long correlationId) { + private void handleReplicaRequest(ReplicaRequest request, ClusterNode sender, @Nullable Long correlationId) { if (!busyLock.enterBusy()) { if (LOG.isInfoEnabled()) { LOG.info("Failed to process replica request (the node is stopping) [request={}].", request); @@ -269,6 +267,8 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc return; } + String senderConsistentId = sender.name(); + try { // Notify the sender that the Replica is created and ready to process requests. if (request instanceof AwaitReplicaRequest) { @@ -319,11 +319,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc // replicaFut is always completed here. Replica replica = replicaFut.join(); - // TODO IGNITE-20296 Id of the node should come along with the message itself. - ClusterNode sender = clusterNetSvc.topologyService().getByConsistentId(senderConsistentId); - - assert sender != null : "The sender is undefined (should be fixed by https://issues.apache.org/jira/browse/IGNITE-20296 )"; - String senderId = sender.id(); CompletableFuture<ReplicaResult> resFut = replica.processRequest(request, senderId); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java index 53ab362f45..59127a4b54 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java @@ -106,7 +106,6 @@ import org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.TopologyService; -import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.Status; import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta; import org.apache.ignite.raft.jraft.error.RaftError; @@ -137,8 +136,6 @@ public class IncomingSnapshotCopierTest extends BaseIgniteAbstractTest { private static final TableMessagesFactory TABLE_MSG_FACTORY = new TableMessagesFactory(); - private static final RaftMessagesFactory RAFT_MSG_FACTORY = new RaftMessagesFactory(); - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final ClusterNode clusterNode = mock(ClusterNode.class);