This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 29dedf63f6 IGNITE-20852 Simultaneous incoming and outgoing connection
attempts may cause connection failure (#2850)
29dedf63f6 is described below
commit 29dedf63f6699bf9705874de7b9e990b4b0ecddc
Author: Sergey Chugunov <[email protected]>
AuthorDate: Thu Nov 23 18:17:58 2023 +0400
IGNITE-20852 Simultaneous incoming and outgoing connection attempts may
cause connection failure (#2850)
* Handshake protocol is extended to allow a node losing a clinch notify its
origin
* As a result of a handshake, the caller always gets a NettySender, even if
the caller lost the clinch
* If, during a handshake, a party cannot obtain a lock at its side, it
gives the competitor way unconditionally (as the competitor has advanced
further)
Signed-off-by: Sergey Chugunov <[email protected]>
---
.../network/netty/ItConnectionManagerTest.java | 41 +---
.../network/handshake/HandshakeManager.java | 18 +-
.../internal/network/netty/ConnectionManager.java | 49 +++++
.../internal/network/netty/HandshakeHandler.java | 6 +-
.../ignite/internal/network/netty/NettyClient.java | 2 +-
.../network/recovery/DescriptorAcquiry.java | 67 +++++++
.../network/recovery/HandshakeTieBreaker.java | 13 +-
.../recovery/RecoveryClientHandshakeManager.java | 155 ++++++++++-----
.../network/recovery/RecoveryDescriptor.java | 35 +++-
.../recovery/RecoveryServerHandshakeManager.java | 127 +++++++++----
.../recovery/message/HandshakeRejectedMessage.java | 20 +-
...dMessage.java => HandshakeRejectionReason.java} | 28 ++-
.../ignite/network/DefaultMessagingService.java | 23 +--
.../internal/network/netty/NettyClientTest.java | 8 +-
.../internal/network/netty/NettyServerTest.java | 5 +-
.../network/netty/RecoveryHandshakeTest.java | 108 +++++++++--
.../network/recovery/DescriptorAcquiryTest.java | 56 ++++++
.../RecoveryClientHandshakeManagerTest.java | 207 +++++++++++++++++++++
.../network/recovery/RecoveryDescriptorTest.java | 141 ++++++++++++++
.../RecoveryServerHandshakeManagerTest.java | 169 +++++++++++++++++
20 files changed, 1070 insertions(+), 208 deletions(-)
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9d48f98304..399b216e09 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -70,6 +70,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -321,6 +322,7 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
* @throws Exception If failed.
*/
@RepeatedTest(100)
+ @Timeout(10)
public void testOneChannelLeftIfConnectToEachOther() throws Exception {
try (
ConnectionManagerWrapper manager1 = startManager(4000);
@@ -329,42 +331,11 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
CompletableFuture<NettySender> fut1 =
manager1.openChannelTo(manager2).toCompletableFuture();
CompletableFuture<NettySender> fut2 =
manager2.openChannelTo(manager1).toCompletableFuture();
- NettySender sender1 = null;
- NettySender sender2 = null;
+ NettySender sender1 = fut1.get(1, TimeUnit.SECONDS);
+ NettySender sender2 = fut2.get(1, TimeUnit.SECONDS);
- try {
- sender1 = fut1.get(1, TimeUnit.SECONDS);
- } catch (Exception ignored) {
- // No-op.
- }
- try {
- sender2 = fut2.get(1, TimeUnit.SECONDS);
- } catch (Exception ignored) {
- // No-op.
- }
-
- NettySender highlander = null;
-
- assertTrue(sender1 != null || sender2 != null);
-
- if (sender1 != null && sender1.isOpen()) {
- highlander = sender1;
-
- boolean sender2NullOrClosed = sender2 == null ||
!sender2.isOpen();
-
- assertTrue(sender2NullOrClosed);
- }
-
- if (sender2 != null && sender2.isOpen()) {
- highlander = sender2;
-
- boolean sender1NullOrClosed = sender1 == null ||
!sender1.isOpen();
-
- assertTrue(sender1NullOrClosed);
- }
-
- assertNotNull(highlander);
- assertTrue(highlander.isOpen());
+ assertTrue(sender1.isOpen());
+ assertTrue(sender2.isOpen());
assertTrue(
waitForCondition(
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
index 30b3278288..c39930e52c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.handshake;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.network.NetworkMessage;
@@ -48,9 +49,20 @@ public interface HandshakeManager {
void onMessage(NetworkMessage message);
/**
- * Returns future that represents the handshake operation.
+ * Returns local future that represents the handshake operation. This is
the future that
+ * gets completed when the handshake itself terminates either successfully
or with an exception.
+ * This is used to complete the current handshake; to get the final
outcome of the connection attempt
+ * please use {@link #finalHandshakeFuture()}.
*
- * @return Future that represents the handshake operation.
+ * @return Local future that represents the handshake operation.
*/
- CompletableFuture<NettySender> handshakeFuture();
+ CompletableFuture<NettySender> localHandshakeFuture();
+
+ /**
+ * Returns final future that represents the handshake operation. This
represents completion of either
+ * current handshake or the inverse handshake if it wins (and the current
one loses).
+ *
+ * @return Final future that represents the handshake operation.
+ */
+ CompletionStage<NettySender> finalHandshakeFuture();
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index aedaa070ea..6587b91bde 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network.netty;
+import static java.util.function.Function.identity;
import static org.apache.ignite.network.ChannelType.getChannel;
import io.netty.bootstrap.Bootstrap;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.configuration.NetworkView;
+import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
@@ -65,6 +67,8 @@ public class ConnectionManager implements
ChannelCreationListener {
/** Latest version of the direct marshalling protocol. */
public static final byte DIRECT_PROTOCOL_VERSION = 1;
+ private static final int MAX_RETRIES_TO_OPEN_CHANNEL = 10;
+
/** Client bootstrap. */
private final Bootstrap clientBootstrap;
@@ -223,6 +227,51 @@ public class ConnectionManager implements
ChannelCreationListener {
* @return Sender.
*/
public OrderingFuture<NettySender> channel(@Nullable String consistentId,
ChannelType type, InetSocketAddress address) {
+ return getChannelWithRetry(consistentId, type, address, 0);
+ }
+
+ private OrderingFuture<NettySender> getChannelWithRetry(
+ @Nullable String consistentId,
+ ChannelType type,
+ InetSocketAddress address,
+ int attempt
+ ) {
+ if (attempt > MAX_RETRIES_TO_OPEN_CHANNEL) {
+ return OrderingFuture.failedFuture(new IllegalStateException("Too
many attempts to open channel to " + consistentId));
+ }
+
+ return doGetChannel(consistentId, type, address)
+ .handle((res, ex) -> {
+ if (ex instanceof ChannelAlreadyExistsException) {
+ return
getChannelWithRetry(((ChannelAlreadyExistsException) ex).consistentId(), type,
address, attempt + 1);
+ }
+ if (ex != null && ex.getCause() instanceof
ChannelAlreadyExistsException) {
+ return getChannelWithRetry(
+ ((ChannelAlreadyExistsException)
ex.getCause()).consistentId(),
+ type,
+ address,
+ attempt + 1
+ );
+ }
+ if (ex != null) {
+ return OrderingFuture.<NettySender>failedFuture(ex);
+ }
+
+ assert res != null;
+ if (res.isOpen()) {
+ return OrderingFuture.completedFuture(res);
+ } else {
+ return getChannelWithRetry(res.consistentId(), type,
address, attempt + 1);
+ }
+ })
+ .thenCompose(identity());
+ }
+
+ private OrderingFuture<NettySender> doGetChannel(
+ @Nullable String consistentId,
+ ChannelType type,
+ InetSocketAddress address
+ ) {
// Problem is we can't look up a channel by consistent id because
consistent id is not known yet.
if (consistentId != null) {
// If consistent id is known, try looking up a channel by
consistent id. There can be an outbound connection
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 83b60e4834..de57be659d 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -75,7 +75,7 @@ public class HandshakeHandler extends
ChannelInboundHandlerAdapter {
throw e;
}
- manager.handshakeFuture().whenComplete((unused, throwable) -> {
+ manager.localHandshakeFuture().whenComplete((unused, throwable) -> {
if (throwable != null) {
LOG.debug("Error when performing handshake", throwable);
@@ -97,7 +97,7 @@ public class HandshakeHandler extends
ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) {
// If this method is called that means channel has been closed before
handshake has finished or handshake
// has failed.
- manager.handshakeFuture().completeExceptionally(
+ manager.localHandshakeFuture().completeExceptionally(
new HandshakeException("Channel has been closed before
handshake has finished or handshake has failed")
);
@@ -107,7 +107,7 @@ public class HandshakeHandler extends
ChannelInboundHandlerAdapter {
/** {@inheritDoc} */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- manager.handshakeFuture().completeExceptionally(cause);
+ manager.localHandshakeFuture().completeExceptionally(cause);
}
/**
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 34dba15646..2748b3f4ea 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -146,7 +146,7 @@ public class NettyClient {
} else if (throwable != null) {
return
CompletableFuture.<NettySender>failedFuture(throwable);
} else {
- return handshakeManager.handshakeFuture();
+ return handshakeManager.finalHandshakeFuture();
}
}
})
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
new file mode 100644
index 0000000000..0b893eb193
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiry.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.network.netty.NettySender;
+
+/**
+ * Context around a fact that a {@link RecoveryDescriptor} is acquired by some
channel.
+ */
+class DescriptorAcquiry {
+ private final Channel channel;
+ private final CompletableFuture<NettySender> handshakeCompleteFuture;
+
+ private final CompletableFuture<Void> clinchResolved = new
CompletableFuture<>();
+
+ DescriptorAcquiry(Channel channel, CompletableFuture<NettySender>
handshakeCompleteFuture) {
+ this.channel = channel;
+ this.handshakeCompleteFuture = handshakeCompleteFuture;
+ }
+
+ /**
+ * Returns the channel that owns the descriptor.
+ */
+ Channel channel() {
+ return channel;
+ }
+
+ /**
+ * Returns a completion stage that gets completed when a clinch associated
with this acquiry is resolved
+ * (that is, the owning handshake gave up and released the recovery
descriptor).
+ */
+ CompletionStage<Void> clinchResolved() {
+ return clinchResolved;
+ }
+
+ /**
+ * Signals that the owner of this recovery descriptor gave up and, hence,
the clinch has been resolved.
+ */
+ void markClinchResolved() {
+ clinchResolved.complete(null);
+ }
+
+ /**
+ * Returns the future that gets completed when the handshake performed by
the owner of the descriptor completes.
+ */
+ CompletionStage<NettySender> handshakeCompleteFuture() {
+ return handshakeCompleteFuture;
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
index e3bf6018ef..6210b5793b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
@@ -20,15 +20,18 @@ package org.apache.ignite.internal.network.recovery;
import java.util.UUID;
/**
- * The HandshakeTieBreaker class provides a mechanism for determining whether
an existing channel should be closed during a handshake
- * process.
+ * The HandshakeTieBreaker class provides a mechanism for determining whether
an existing channel should be closed in case of a clinch
+ * during a handshake process.
+ *
+ * <p>A clinch is a situation when two parallel handshakes (one from node A to
B, another from B to A) acquire locks (now these are
+ * recovery descriptors) on different sides, then each of them tries to take a
lock on the opposite side, which is impossible as
+ * it's already held by the corresponding competitor. To resolve such a
deadlock, one of the handshakes must be terminated.
*/
class HandshakeTieBreaker {
/**
* Determines whether an existing channel should be closed based on the
comparison of the server's launch id and the client's launch id.
- * If the client's launch id is greater than the server's launch id, the
existing channel should be closed in favor of the new one. If
- * the server's launch id is less than or equal to the client's launch id,
the existing channel should be closed in favor of the new
- * one.
+ * If the client's launch id is greater than the server's launch id, the
existing channel should be closed in favor of the new one;
+ * otherwise, the new channel should be closed.
*
* @param serverLaunchId Server's launch id.
* @param clientLaunchId Client's launch id.
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 67f03822d8..040e44ccaa 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
-import static
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
+import static java.util.function.Function.identity;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.lang.IgniteException;
@@ -74,7 +76,13 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
private final short connectionId;
/** Handshake completion future. */
- private final CompletableFuture<NettySender> handshakeCompleteFuture = new
CompletableFuture<>();
+ private final CompletableFuture<NettySender> localHandshakeCompleteFuture
= new CompletableFuture<>();
+
+ /**
+ * Master future used to complete the handshake either with the results of
this handshake of the competing one
+ * (in the opposite direction), if it wins.
+ */
+ private final CompletableFuture<CompletionStage<NettySender>>
masterHandshakeCompleteFuture = new CompletableFuture<>();
/** Remote node's launch id. */
private UUID remoteLaunchId;
@@ -120,10 +128,13 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
this.staleIdDetector = staleIdDetector;
this.stopping = stopping;
- this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+ localHandshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
if (throwable != null) {
releaseResources();
+ // Complete the master future if it has not yet been completed
by the competitor.
+
masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture);
+
return;
}
@@ -159,22 +170,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
}
if (message instanceof HandshakeRejectedMessage) {
- HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
-
- boolean ignorable = stopping.get() || !msg.critical();
-
- if (ignorable) {
- LOG.debug("Handshake rejected by server: {}", msg.reason());
- } else {
- LOG.warn("Handshake rejected by server: {}", msg.reason());
- }
-
- handshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.reason()));
-
- if (!ignorable) {
- // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
- failureHandler.handleFailure(new IgniteException("Handshake
rejected by server: " + msg.reason()));
- }
+ onHandshakeRejectedMessage((HandshakeRejectedMessage) message);
return;
}
@@ -237,24 +233,23 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
@@ -262,25 +257,81 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
+ );
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ ", but it's stopping";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.message());
+ } else {
+ LOG.warn("Handshake rejected by server: {}", msg.message());
+ }
+
+ if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+ giveUpClinch();
+ } else {
+ localHandshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.message()));
+ }
+
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by server: " + msg.message()));
+ }
+ }
+
+ private void giveUpClinch() {
+ RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ connectionId
+ );
+
+ DescriptorAcquiry myAcquiry = descriptor.holder();
+ assert myAcquiry != null;
+ assert myAcquiry.channel() == ctx.channel() : "Expected the descriptor
to be held by current channel " + ctx.channel()
+ + ", but it's held by another channel " + myAcquiry.channel();
+
+ descriptor.release(ctx);
+
+ // Complete the future to allow the competitor that should wait on it
acquire the descriptor and finish its handshake.
+ myAcquiry.markClinchResolved();
+
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry != null) {
+ // The competitor is available, so just complete our master future
with the competitor future.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
+ } else {
+ // The competitor is not at the lock yet. Maybe it will arrive
soon, maybe it will never arrive.
+ // The safest thing is to just retry the whole handshake procedure.
+ localHandshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ }
}
private void sendHandshakeRejectedMessage(HandshakeRejectedMessage
rejectionMessage, String reason) {
@@ -288,19 +339,25 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
if (throwable != null) {
- handshakeCompleteFuture.completeExceptionally(
+ localHandshakeCompleteFuture.completeExceptionally(
new HandshakeException("Failed to send handshake
rejected message: " + throwable.getMessage(), throwable)
);
} else {
- handshakeCompleteFuture.completeExceptionally(new
HandshakeException(reason));
+ localHandshakeCompleteFuture.completeExceptionally(new
HandshakeException(reason));
}
});
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<NettySender> handshakeFuture() {
- return handshakeCompleteFuture;
+ public CompletableFuture<NettySender> localHandshakeFuture() {
+ return localHandshakeCompleteFuture;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletionStage<NettySender> finalHandshakeFuture() {
+ return masterHandshakeCompleteFuture.thenCompose(identity());
}
private void handshake(RecoveryDescriptor descriptor) {
@@ -317,7 +374,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
if (throwable != null) {
- handshakeCompleteFuture.completeExceptionally(
+ localHandshakeCompleteFuture.completeExceptionally(
new HandshakeException("Failed to send handshake
response: " + throwable.getMessage(), throwable)
);
}
@@ -340,6 +397,8 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
// Removes handshake handler from the pipeline as the handshake is
finished
this.ctx.pipeline().remove(this.handler);
- handshakeCompleteFuture.complete(new NettySender(channel,
remoteLaunchId.toString(), remoteConsistentId, connectionId));
+ // Complete the master future with the local future of the current
handshake as there was no competitor (or we won the competition).
+ masterHandshakeCompleteFuture.complete(localHandshakeCompleteFuture);
+ localHandshakeCompleteFuture.complete(new NettySender(channel,
remoteLaunchId.toString(), remoteConsistentId, connectionId));
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
index 463f2ed49c..33c03fad2a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -23,7 +23,9 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.OutNetworkObject;
import org.jetbrains.annotations.Nullable;
@@ -44,8 +46,8 @@ public class RecoveryDescriptor {
/** Count of received messages. */
private long receivedCount;
- /** Current owner channel of this descriptor. */
- private final AtomicReference<Channel> channelHolder = new
AtomicReference<>();
+ /** Some context around current owner channel of this descriptor. */
+ private final AtomicReference<DescriptorAcquiry> channelHolder = new
AtomicReference<>();
/**
* Constructor.
@@ -138,22 +140,35 @@ public class RecoveryDescriptor {
* @param ctx Channel handler context.
*/
public void release(ChannelHandlerContext ctx) {
- channelHolder.compareAndSet(ctx.channel(), null);
+ DescriptorAcquiry oldAcquiry = channelHolder.getAndUpdate(acquiry -> {
+ if (acquiry != null && acquiry.channel() == ctx.channel()) {
+ return null;
+ }
+
+ return acquiry;
+ });
+
+ if (oldAcquiry != null && oldAcquiry.channel() == ctx.channel()) {
+ // We have successfully released the descriptor.
+ // Let's mark the clinch resolved just in case.
+ oldAcquiry.markClinchResolved();
+ }
}
/**
* Acquire this descriptor.
*
* @param ctx Channel handler context.
+ * @param handshakeCompleteFuture Future that gets completed when the
corresponding handshake completes.
*/
- public boolean acquire(ChannelHandlerContext ctx) {
- return channelHolder.compareAndSet(null, ctx.channel());
+ public boolean acquire(ChannelHandlerContext ctx,
CompletableFuture<NettySender> handshakeCompleteFuture) {
+ return channelHolder.compareAndSet(null, new
DescriptorAcquiry(ctx.channel(), handshakeCompleteFuture));
}
/**
- * Returns the channel, that holds this descriptor.
+ * Returns context around the channel that holds this descriptor.
*/
- @Nullable Channel holderChannel() {
+ @Nullable DescriptorAcquiry holder() {
return channelHolder.get();
}
@@ -161,13 +176,13 @@ public class RecoveryDescriptor {
* Returns {@code toString()} representation of a {@link Channel}, that
holds this descriptor.
*/
String holderDescription() {
- Channel channel = channelHolder.get();
+ DescriptorAcquiry acquiry = channelHolder.get();
- if (channel == null) {
+ if (acquiry == null) {
// This can happen if channel was already closed and it released
the descriptor.
return "No channel";
}
- return channel.toString();
+ return acquiry.channel().toString();
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index f65948eed2..36693c9cea 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.lang.IgniteException;
@@ -52,6 +54,8 @@ import org.apache.ignite.network.OutNetworkObject;
public class RecoveryServerHandshakeManager implements HandshakeManager {
private static final IgniteLogger LOG =
Loggers.forClass(RecoveryServerHandshakeManager.class);
+ private static final int MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS = 1000;
+
/** Launch id. */
private final UUID launchId;
@@ -180,22 +184,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
}
if (message instanceof HandshakeRejectedMessage) {
- HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
-
- boolean ignorable = stopping.get() || !msg.critical();
-
- if (ignorable) {
- LOG.debug("Handshake rejected by client: {}", msg.reason());
- } else {
- LOG.warn("Handshake rejected by client: {}", msg.reason());
- }
-
- handshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.reason()));
-
- if (!ignorable) {
- // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
- failureHandler.handleFailure(new IgniteException("Handshake
rejected by client: " + msg.reason()));
- }
+ onHandshakeRejectedMessage((HandshakeRejectedMessage) message);
return;
}
@@ -232,27 +221,84 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
this.receivedCount = remoteReceivedCount;
this.remoteChannelId = remoteChannelId;
+ tryAcquireDescriptorAndFinishHandshake();
+ }
+
+ private void handleStaleClientId(HandshakeStartResponseMessage msg) {
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, client should be restarted to be allowed to connect";
+ HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
+ .build();
+
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage
msg) {
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ + ", but it's stopping";
+ HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
+ .build();
+
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void tryAcquireDescriptorAndFinishHandshake() {
+ tryAcquireDescriptorAndFinishHandshake(0);
+ }
+
+ private void tryAcquireDescriptorAndFinishHandshake(int attempt) {
+ if (attempt > MAX_CLINCH_TERMINATION_AWAIT_ATTEMPTS) {
+ throw new IllegalStateException("Too many attempts during
handshake from " + remoteConsistentId + "(" + remoteLaunchId
+ + ":" + remoteChannelId + ") via " + ctx.channel());
+ }
+
RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
this.remoteConsistentId,
this.remoteLaunchId,
this.remoteChannelId
);
- while (!descriptor.acquire(ctx)) {
+ while (!descriptor.acquire(ctx, handshakeCompleteFuture)) {
if (shouldCloseChannel(launchId, remoteLaunchId)) {
- Channel holderChannel = descriptor.holderChannel();
+ // A competitor is holding the descriptor and we win the
clinch; so we need to wait on the 'clinch resolved' future till
+ // the competitor realises it should terminate (this
realization will happen on the other side of the channel), send
+ // the corresponding message to this node, terminate its
handshake and complete the 'clinch resolved' future.
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
- if (holderChannel == null) {
+ if (competitorAcquiry == null) {
continue;
}
- holderChannel.close().awaitUninterruptibly();
+ competitorAcquiry.clinchResolved().whenComplete((res, ex) -> {
+ // The competitor has finished terminating its handshake,
it must've already released the descriptor,
+ // so let's try again.
+ if (ctx.executor().inEventLoop()) {
+ tryAcquireDescriptorAndFinishHandshake(attempt + 1);
+ } else {
+ ctx.executor().execute(() ->
tryAcquireDescriptorAndFinishHandshake(attempt + 1));
+ }
+ });
+
+ return;
} else {
- String err = "Failed to acquire recovery descriptor during
handshake, it is held by: " + descriptor.holderDescription();
+ // A competitor is holding the descriptor and we lose the
clinch; so we need to send the correspnding message
+ // to the other side, where the code handling the message will
terminate our handshake and complete the 'clinch resolved'
+ // future, making it possible for the competitor to proceed
and finish the handshake.
+ String localErrorMessage = "Failed to acquire recovery
descriptor during handshake, it is held by: "
+ + descriptor.holderDescription();
+
+ LOG.debug(localErrorMessage);
- LOG.info(err);
+ HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
+ .reasonString(HandshakeRejectionReason.CLINCH.name())
+ .message("Handshake clinch detected, this handshake
will be terminated, winning channel is "
+ + descriptor.holderDescription())
+ .build();
- handshakeCompleteFuture.completeExceptionally(new
HandshakeException(err));
+ sendHandshakeRejectedMessage(rejectionMessage,
localErrorMessage);
return;
}
@@ -263,25 +309,21 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
handshake(descriptor);
}
- private void handleStaleClientId(HandshakeStartResponseMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, client should be restarted to be allowed to connect";
- HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
- .build();
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
- }
+ if (ignorable) {
+ LOG.debug("Handshake rejected by client: {}", msg.message());
+ } else {
+ LOG.warn("Handshake rejected by client: {}", msg.message());
+ }
- private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartResponseMessage
msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
- + ", but it's stopping";
- HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
- .build();
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.message()));
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by client: " + msg.message()));
+ }
}
private void sendHandshakeRejectedMessage(HandshakeRejectedMessage
rejectionMessage, String reason) {
@@ -353,7 +395,12 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
/** {@inheritDoc} */
@Override
- public CompletableFuture<NettySender> handshakeFuture() {
+ public CompletableFuture<NettySender> localHandshakeFuture() {
+ return handshakeCompleteFuture;
+ }
+
+ @Override
+ public CompletionStage<NettySender> finalHandshakeFuture() {
return handshakeCompleteFuture;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
index 9871de3597..e83abeb8ef 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -24,21 +24,25 @@ import org.apache.ignite.network.annotations.Transferable;
/**
* Handshake rejected message, contains the reason for a rejection.
* This message is sent from a server to a client or wise versa.
- * After this message is received it makes no sense to retry connections with
same node identity (launch ID must be changed
- * to make a retry).
*/
@Transferable(HANDSHAKE_REJECTED)
public interface HandshakeRejectedMessage extends InternalMessage {
/**
- * Returns rejection reason.
+ * Returns rejection message.
+ */
+ String message();
+
+ /**
+ * Returns rejection reason (this is the name of a member of {@link
HandshakeRejectionReason}).
*
- * @return Reason of the rejection.
+ * @see HandshakeRejectionReason
*/
- String reason();
+ String reasonString();
/**
- * Returns {@code true} iff the rejection is not expected and should be
treated as a critical failure (requiring
- * the rejected node to restart).
+ * Returns rejection reason.
*/
- boolean critical();
+ default HandshakeRejectionReason reason() {
+ return HandshakeRejectionReason.valueOf(reasonString());
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
similarity index 61%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
index 9871de3597..b70fea80aa 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
@@ -17,28 +17,26 @@
package org.apache.ignite.internal.network.recovery.message;
-import static
org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_REJECTED;
-
-import org.apache.ignite.network.annotations.Transferable;
-
/**
- * Handshake rejected message, contains the reason for a rejection.
- * This message is sent from a server to a client or wise versa.
- * After this message is received it makes no sense to retry connections with
same node identity (launch ID must be changed
- * to make a retry).
+ * Reason for handshake rejection.
*/
-@Transferable(HANDSHAKE_REJECTED)
-public interface HandshakeRejectedMessage extends InternalMessage {
+public enum HandshakeRejectionReason {
+ /** The sender is stopping. */
+ STOPPING,
/**
- * Returns rejection reason.
- *
- * @return Reason of the rejection.
+ * The sender has detected that the counterpart launch ID is stale (was
earlier used to establish a connection).
+ * After this is received it makes no sense to retry connections with same
node identity (launch ID must be changed
+ * to make a retry).
*/
- String reason();
+ STALE_LAUNCH_ID,
+ /** The sender has detected a clinch and decided to terminate this
handshake in favor of the competitor. */
+ CLINCH;
/**
* Returns {@code true} iff the rejection is not expected and should be
treated as a critical failure (requiring
* the rejected node to restart).
*/
- boolean critical();
+ public boolean critical() {
+ return this == STALE_LAUNCH_ID;
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 6fb114ae90..b5f52fa336 100644
---
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -28,7 +28,6 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -37,12 +36,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Function;
-import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -298,24 +295,8 @@ public class DefaultMessagingService extends
AbstractMessagingService {
return failedFuture(new IgniteException("Failed to marshal
message: " + e.getMessage(), e));
}
- OrderingFuture<NettySender> channel =
connectionManager.channel(consistentId, type, addr);
-
- return channel.handle((sender, throwable) -> {
- if (throwable != null) {
- if (throwable instanceof CompletionException &&
throwable.getCause() instanceof ChannelAlreadyExistsException) {
- ChannelAlreadyExistsException e =
(ChannelAlreadyExistsException) throwable.getCause();
-
- OrderingFuture<NettySender> channelFut =
connectionManager.channel(e.consistentId(), type, addr);
-
- return channelFut.thenComposeToCompletable(nettySender -> {
- return nettySender.send(new OutNetworkObject(message,
descriptors));
- });
- }
-
- throw new CompletionException(throwable);
- }
- return sender.send(new OutNetworkObject(message, descriptors));
- }).thenComposeToCompletable(Function.identity());
+ return connectionManager.channel(consistentId, type, addr)
+ .thenComposeToCompletable(sender -> sender.send(new
OutNetworkObject(message, descriptors)));
}
private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws
Exception {
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index b407968e19..f1f69b9c4f 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -30,6 +30,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -267,10 +268,15 @@ public class NettyClientTest extends
BaseIgniteAbstractTest {
/** {@inheritDoc} */
@Override
- public CompletableFuture<NettySender> handshakeFuture() {
+ public CompletableFuture<NettySender> localHandshakeFuture() {
return CompletableFuture.completedFuture(sender);
}
+ @Override
+ public CompletionStage<NettySender> finalHandshakeFuture() {
+ return localHandshakeFuture();
+ }
+
/** {@inheritDoc} */
@Override
public void onInit(ChannelHandlerContext ctx) {
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index bf58a812d6..a8a8431933 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -149,7 +149,8 @@ public class NettyServerTest extends BaseIgniteAbstractTest
{
public void testHandshakeManagerInvoked() throws Exception {
HandshakeManager handshakeManager = mock(HandshakeManager.class);
-
when(handshakeManager.handshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
+
when(handshakeManager.localHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
+
when(handshakeManager.finalHandshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
MessageSerializationRegistry registry =
mock(MessageSerializationRegistry.class);
@@ -218,7 +219,7 @@ public class NettyServerTest extends BaseIgniteAbstractTest
{
order.verify(handshakeManager, timeout()).onInit(any());
order.verify(handshakeManager, timeout()).onConnectionOpen();
- order.verify(handshakeManager, timeout()).handshakeFuture();
+ order.verify(handshakeManager, timeout()).localHandshakeFuture();
order.verify(handshakeManager, timeout()).onMessage(any());
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 305a43c2b2..4d4f9cc922 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -54,6 +54,8 @@ import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Recovery protocol handshake flow test.
@@ -62,6 +64,9 @@ public class RecoveryHandshakeTest {
/** Connection id. */
private static final short CONNECTION_ID = 1337;
+ private static final UUID LOWER_UUID = new UUID(100, 200);
+ private static final UUID HIGHER_UUID = new UUID(300, 400);
+
/** Serialization registry. */
private static final MessageSerializationRegistry MESSAGE_REGISTRY =
defaultSerializationRegistry();
@@ -209,12 +214,12 @@ public class RecoveryHandshakeTest {
}
@Test
- public void testPairedRecoveryDescriptors() throws Exception {
+ public void testPairedRecoveryDescriptorsClinch() throws Exception {
RecoveryDescriptorProvider node1Recovery =
createRecoveryDescriptorProvider();
RecoveryDescriptorProvider node2Recovery =
createRecoveryDescriptorProvider();
- UUID node1Uuid = new UUID(100, 200);
- UUID node2Uuid = new UUID(300, 400);
+ UUID node1Uuid = LOWER_UUID;
+ UUID node2Uuid = HIGHER_UUID;
RecoveryClientHandshakeManager chm1 =
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
RecoveryServerHandshakeManager shm1 =
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
@@ -222,14 +227,14 @@ public class RecoveryHandshakeTest {
RecoveryClientHandshakeManager chm2 =
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
RecoveryServerHandshakeManager shm2 =
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
- // Channel opened from node1 to node2 - channel 1.
- // Channel opened from node2 to node1 - channel 2.
+ // Channel opened from node1 to node2 is channel 1.
+ // Channel opened from node2 to node1 is channel 2.
// Channel 1.
EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
- // Channel 1.
+ // Channel 2.
EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
@@ -239,7 +244,8 @@ public class RecoveryHandshakeTest {
exchangeClientToServer(channel2Dst, channel2Src);
exchangeClientToServer(channel1Dst, channel1Src);
- // 2 -> 1 is alive, while 1 -> 2 closes because of the tie-breaking.
+ // 2 -> 1 (Channel 2) is alive, while 1 -> 2 (Channel 1) closes
because of the tie-breaking.
+ exchangeServerToClient(channel1Dst, channel1Src);
exchangeServerToClient(channel2Dst, channel2Src);
assertFalse(channel1Src.isOpen());
assertFalse(channel1Dst.isOpen());
@@ -253,6 +259,60 @@ public class RecoveryHandshakeTest {
assertFalse(channel1Dst.finish());
}
+ /**
+ * This tests the following scenario: two handshakes in the opposite
directions are started,
+ * Handshake 1 is faster and it takes both client-side and server-side
locks (using recovery descriptors
+ * as locks), and only then Handshake 2 tries to take the first lock (the
one on the client side).
+ * In such a situation, tie-breaking logic should not be applied (as
Handshake 1 could have already
+ * established, or almost established, a logical connection); instead,
Handshake 2 must stop
+ * itself (regardless of what that the Tie Breaker would prescribe).
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testLateHandshakeDoesNotUseTieBreaker(boolean
node1LaunchIdIsLower) throws Exception {
+ RecoveryDescriptorProvider node1Recovery =
createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider node2Recovery =
createRecoveryDescriptorProvider();
+
+ UUID node1Uuid = node1LaunchIdIsLower ? LOWER_UUID : HIGHER_UUID;
+ UUID node2Uuid = node1LaunchIdIsLower ? HIGHER_UUID : LOWER_UUID;
+
+ RecoveryClientHandshakeManager chm1 =
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+ RecoveryServerHandshakeManager shm1 =
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+ RecoveryClientHandshakeManager chm2 =
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+ RecoveryServerHandshakeManager shm2 =
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+ // Channel opened from node1 to node2 is channel 1.
+ // Channel opened from node2 to node1 is channel 2.
+
+ // Channel 1.
+ EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
+ EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
+
+ // Channel 2.
+ EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
+ EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
+
+ // Channel 2's handshake acquires both locks.
+ exchangeServerToClient(channel2Dst, channel2Src);
+ exchangeClientToServer(channel2Dst, channel2Src);
+
+ // Now Channel 1's handshake cannot acquire even first lock.
+ exchangeServerToClient(channel1Dst, channel1Src);
+
+ // 2 -> 1 is alive, while 1 -> 2 closes because it is late.
+ exchangeServerToClient(channel2Dst, channel2Src);
+ assertFalse(channel1Src.isOpen());
+
+ assertTrue(channel2Src.isOpen());
+ assertTrue(channel2Dst.isOpen());
+
+ assertFalse(channel1Src.finish());
+ assertFalse(channel2Dst.finish());
+ assertFalse(channel2Src.finish());
+ assertFalse(channel1Dst.finish());
+ }
+
@Test
public void testExactlyOnceServer() throws Exception {
testExactlyOnce(true);
@@ -464,25 +524,41 @@ public class RecoveryHandshakeTest {
}
private void checkHandshakeNotCompleted(HandshakeManager manager) {
- CompletableFuture<NettySender> handshakeFuture =
manager.handshakeFuture();
- assertFalse(handshakeFuture.isDone());
- assertFalse(handshakeFuture.isCompletedExceptionally());
- assertFalse(handshakeFuture.isCancelled());
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ assertFalse(localHandshakeFuture.isDone());
+ assertFalse(localHandshakeFuture.isCompletedExceptionally());
+ assertFalse(localHandshakeFuture.isCancelled());
+
+ CompletableFuture<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture().toCompletableFuture();
+ assertFalse(finalHandshakeFuture.isDone());
+ assertFalse(finalHandshakeFuture.isCompletedExceptionally());
+ assertFalse(finalHandshakeFuture.isCancelled());
}
private void checkHandshakeCompleted(HandshakeManager manager) {
- CompletableFuture<NettySender> handshakeFuture =
manager.handshakeFuture();
- assertTrue(handshakeFuture.isDone());
- assertFalse(handshakeFuture.isCompletedExceptionally());
- assertFalse(handshakeFuture.isCancelled());
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ assertTrue(localHandshakeFuture.isDone());
+ assertFalse(localHandshakeFuture.isCompletedExceptionally());
+ assertFalse(localHandshakeFuture.isCancelled());
+
+ CompletableFuture<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture().toCompletableFuture();
+ assertTrue(finalHandshakeFuture.isDone());
+ assertFalse(finalHandshakeFuture.isCompletedExceptionally());
+ assertFalse(finalHandshakeFuture.isCancelled());
}
private void checkHandshakeCompletedExceptionally(HandshakeManager
manager) {
- CompletableFuture<NettySender> handshakeFuture =
manager.handshakeFuture();
+ CompletableFuture<NettySender> handshakeFuture =
manager.localHandshakeFuture();
assertTrue(handshakeFuture.isDone());
assertTrue(handshakeFuture.isCompletedExceptionally());
assertFalse(handshakeFuture.isCancelled());
+
+ CompletableFuture<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture().toCompletableFuture();
+
+ assertTrue(finalHandshakeFuture.isDone());
+ assertTrue(finalHandshakeFuture.isCompletedExceptionally());
+ assertFalse(finalHandshakeFuture.isCancelled());
}
private void addUnacknowledgedMessages(RecoveryDescriptor
recoveryDescriptor) {
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
new file mode 100644
index 0000000000..349a34957a
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/DescriptorAcquiryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+
+import io.netty.channel.Channel;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DescriptorAcquiryTest extends BaseIgniteAbstractTest {
+ @Mock
+ private Channel channel;
+
+ private final CompletableFuture<NettySender> handshakeCompleteFuture = new
CompletableFuture<>();
+
+ @Test
+ void clinchResolvedStagedIsInitiallyIncomplete() {
+ DescriptorAcquiry acquiry = new DescriptorAcquiry(channel,
handshakeCompleteFuture);
+
+ assertThat(acquiry.clinchResolved().toCompletableFuture(),
is(not(completedFuture())));
+ }
+
+ @Test
+ void clinchGetsResolved() {
+ DescriptorAcquiry acquiry = new DescriptorAcquiry(channel,
handshakeCompleteFuture);
+
+ acquiry.markClinchResolved();
+
+ assertThat(acquiry.clinchResolved().toCompletableFuture(),
is(completedFuture()));
+ }
+}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
new file mode 100644
index 0000000000..f728b7b94b
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.util.UUID.randomUUID;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.NettySender;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+@Timeout(10)
+class RecoveryClientHandshakeManagerTest extends BaseIgniteAbstractTest {
+ private static final UUID LOWER_ID = new UUID(1, 1);
+ private static final UUID HIGHER_ID = new UUID(2, 2);
+
+ private static final String CLIENT_CONSISTENT_ID = "client";
+ private static final String SERVER_CONSISTENT_ID = "server";
+
+ private static final short CONNECTION_INDEX = 0;
+
+ private static final NetworkMessagesFactory MESSAGE_FACTORY = new
NetworkMessagesFactory();
+
+ @Mock
+ private Channel thisChannel;
+ @Mock
+ private Channel competitorChannel;
+
+ @Mock
+ private ChannelHandlerContext thisContext;
+ @Mock
+ private ChannelHandlerContext competitorContext;
+
+ @Mock
+ private ChannelCreationListener channelCreationListener;
+
+ @Mock
+ private RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+ @Mock
+ private EventExecutor eventExecutor;
+
+ @Mock
+ private NettySender competitorNettySender;
+
+ private final RecoveryDescriptor recoveryDescriptor = new
RecoveryDescriptor(100);
+
+ @BeforeEach
+ void initMocks() {
+ lenient().when(thisContext.channel()).thenReturn(thisChannel);
+
lenient().when(competitorContext.channel()).thenReturn(competitorChannel);
+
+ lenient().when(thisContext.executor()).thenReturn(eventExecutor);
+ lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
+
+ lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(any(),
any(), anyShort()))
+ .thenReturn(recoveryDescriptor);
+ }
+
+ /**
+ * This tests the following scenario: two handshakes in the opposite
directions are started,
+ * Handshake 1 is faster and it takes both client-side and server-side
locks (using recovery descriptors
+ * as locks), and only then Handshake 2 tries to take the first lock (the
one on the client side).
+ * In such a situation, tie-breaking logic should not be applied (as
Handshake 1 could have already
+ * established, or almost established, a logical connection); instead,
Handshake 2 must stop
+ * itself (regardless of what that the Tie Breaker would prescribe).
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void terminatesCurrentHandshakeWhenCannotAcquireLockAtClientSide(boolean
clientLaunchIdIsLower) {
+ UUID clientLaunchId = clientLaunchIdIsLower ? LOWER_ID : HIGHER_ID;
+ UUID serverLaunchId = clientLaunchIdIsLower ? HIGHER_ID : LOWER_ID;
+
+ RecoveryClientHandshakeManager manager =
clientHandshakeManager(clientLaunchId);
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ CompletionStage<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture();
+
+ recoveryDescriptor.acquire(thisContext,
completedFuture(competitorNettySender));
+
+ manager.onMessage(handshakeStartMessageFrom(serverLaunchId));
+
+ verify(thisChannel, never()).close();
+ verify(thisChannel, never()).close(any(ChannelPromise.class));
+
+ HandshakeException ex = assertWillThrowFast(localHandshakeFuture,
HandshakeException.class);
+ assertThat(ex.getMessage(), is("Stepping aside to allow an incoming
handshake from server finish."));
+
+ assertThat(finalHandshakeFuture.toCompletableFuture(),
willCompleteSuccessfully());
+ assertThat(finalHandshakeFuture.toCompletableFuture().join(),
is(competitorNettySender));
+ }
+
+ private RecoveryClientHandshakeManager clientHandshakeManager(UUID
launchId) {
+ RecoveryClientHandshakeManager manager = new
RecoveryClientHandshakeManager(
+ launchId,
+ CLIENT_CONSISTENT_ID,
+ CONNECTION_INDEX,
+ recoveryDescriptorProvider,
+ new AllIdsAreFresh(),
+ channelCreationListener,
+ new AtomicBoolean(false)
+ );
+
+ manager.onInit(thisContext);
+
+ return manager;
+ }
+
+ private static HandshakeStartMessage handshakeStartMessageFrom(UUID
serverLaunchId) {
+ return MESSAGE_FACTORY.handshakeStartMessage()
+ .launchId(serverLaunchId)
+ .consistentId(SERVER_CONSISTENT_ID)
+ .build();
+ }
+
+ @Test
+ void
switchesToCompetitorFutureWhenRejectedDueToClinchAndCompetitorIsHere() {
+ RecoveryClientHandshakeManager manager =
clientHandshakeManager(randomUUID());
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ CompletionStage<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture();
+
+ recoveryDescriptor.acquire(thisContext, new CompletableFuture<>());
+
+ DescriptorAcquiry thisAcquiry = recoveryDescriptor.holder();
+ assertThat(thisAcquiry, notNullValue());
+ thisAcquiry.clinchResolved().whenComplete(((unused, ex) -> {
+ assertThat(recoveryDescriptor.acquire(competitorContext,
completedFuture(competitorNettySender)), is(true));
+ }));
+
+ manager.onMessage(handshakeRejectedMessageDueToClinchFrom());
+
+ HandshakeException ex = assertWillThrowFast(localHandshakeFuture,
HandshakeException.class);
+ assertThat(ex.getMessage(), startsWith("Stepping aside to allow an
incoming handshake from "));
+
+ assertThat(finalHandshakeFuture.toCompletableFuture(),
willCompleteSuccessfully());
+ assertThat(finalHandshakeFuture.toCompletableFuture().join(),
is(competitorNettySender));
+ }
+
+ @Test
+ void
finishesWithChannelAlreadyExistsExceptionWhenRejectedDueToClinchAndCompetitorIsNotHere()
{
+ RecoveryClientHandshakeManager manager =
clientHandshakeManager(randomUUID());
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ CompletionStage<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture();
+
+ recoveryDescriptor.acquire(thisContext, new CompletableFuture<>());
+
+ manager.onMessage(handshakeRejectedMessageDueToClinchFrom());
+
+ assertWillThrowFast(localHandshakeFuture,
ChannelAlreadyExistsException.class);
+ assertWillThrowFast(finalHandshakeFuture.toCompletableFuture(),
ChannelAlreadyExistsException.class);
+ }
+
+ private static HandshakeRejectedMessage
handshakeRejectedMessageDueToClinchFrom() {
+ return MESSAGE_FACTORY.handshakeRejectedMessage()
+ .reasonString(HandshakeRejectionReason.CLINCH.name())
+ .message("Rejected")
+ .build();
+ }
+}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
new file mode 100644
index 0000000000..1261b637f0
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.lenient;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class RecoveryDescriptorTest extends BaseIgniteAbstractTest {
+ private final RecoveryDescriptor descriptor = new RecoveryDescriptor(100);
+
+ @Mock
+ private Channel channel1;
+ @Mock
+ private Channel channel2;
+
+ @Mock
+ private ChannelHandlerContext context1;
+ @Mock
+ private ChannelHandlerContext context2;
+
+ private final CompletableFuture<NettySender> handshakeCompleteFuture1 =
new CompletableFuture<>();
+ private final CompletableFuture<NettySender> handshakeCompleteFuture2 =
new CompletableFuture<>();
+
+ @BeforeEach
+ void setupMocks() {
+ lenient().when(context1.channel()).thenReturn(channel1);
+ lenient().when(context2.channel()).thenReturn(channel2);
+ }
+
+ @Test
+ void acquiresNonAcquiredDescriptor() {
+ assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1));
+ }
+
+ @Test
+ void acquiryIsAbsentOnNonAcquiredDescriptor() {
+ assertThat(descriptor.holder(), is(nullValue()));
+ }
+
+ @Test
+ void acquiryInformationIsAvailabeAfterAcquiring() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+
+ DescriptorAcquiry acquiry = descriptor.holder();
+ assertThat(acquiry, is(notNullValue()));
+ assertThat(acquiry.channel(), is(channel1));
+ assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(),
is(false));
+ }
+
+ @Test
+ void cannotAcquireAcquiredDescriptor() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+
+ assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2));
+ assertThat(descriptor.holder().channel(), is(channel1));
+ }
+
+ @Test
+ void releaseMakesDescriptorAvailable() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+
+ descriptor.release(context1);
+
+ assertTrue(descriptor.acquire(context1, handshakeCompleteFuture1));
+ DescriptorAcquiry acquiry = descriptor.holder();
+ assertThat(acquiry, is(notNullValue()));
+ assertThat(acquiry.channel(), is(channel1));
+ }
+
+ @Test
+ void releaseRemovesAcquiryInformation() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+
+ descriptor.release(context1);
+
+ assertThat(descriptor.holder(), is(nullValue()));
+ }
+
+ @Test
+ void releaseWithAnotherContextHasNoEffect() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+
+ descriptor.release(context2);
+
+ DescriptorAcquiry acquiry = descriptor.holder();
+ assertThat(acquiry, is(notNullValue()));
+ assertThat(acquiry.channel(), is(channel1));
+ assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(),
is(false));
+
+ assertFalse(descriptor.acquire(context2, handshakeCompleteFuture2));
+
+ acquiry = descriptor.holder();
+ assertThat(acquiry, is(notNullValue()));
+ assertThat(acquiry.channel(), is(channel1));
+ assertThat(acquiry.clinchResolved().toCompletableFuture().isDone(),
is(false));
+ }
+
+ @Test
+ void releaseCompletesClinchReleasedStage() {
+ descriptor.acquire(context1, handshakeCompleteFuture1);
+ CompletionStage<Void> clinchResolved =
descriptor.holder().clinchResolved();
+
+ descriptor.release(context1);
+
+ assertThat(clinchResolved.toCompletableFuture(),
is(completedFuture()));
+ }
+}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
new file mode 100644
index 0000000000..9ad44b62cf
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static
org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelProgressivePromise;
+import io.netty.util.concurrent.EventExecutor;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeException;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
+import org.apache.ignite.internal.network.netty.NettySender;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectionReason;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.OutNetworkObject;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class RecoveryServerHandshakeManagerTest extends BaseIgniteAbstractTest {
+ private static final UUID LOWER_ID = new UUID(1, 1);
+ private static final UUID HIGHER_ID = new UUID(2, 2);
+
+ private static final String CLIENT_CONSISTENT_ID = "client";
+ private static final String SERVER_CONSISTENT_ID = "server";
+
+ private static final short CONNECTION_INDEX = 0;
+
+ private static final NetworkMessagesFactory MESSAGE_FACTORY = new
NetworkMessagesFactory();
+
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private ChannelHandlerContext context;
+
+ @Mock
+ private ChannelCreationListener channelCreationListener;
+
+ @Mock
+ private RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+ @Mock
+ private EventExecutor eventExecutor;
+
+ @Captor
+ private ArgumentCaptor<OutNetworkObject> sentMessageCaptor;
+
+ private final RecoveryDescriptor recoveryDescriptor = new
RecoveryDescriptor(100);
+
+ @BeforeEach
+ void initMocks() {
+ lenient().when(context.channel()).thenReturn(channel);
+ lenient().when(channel.close()).thenAnswer(invocation -> {
+ recoveryDescriptor.release(context);
+ return mock(ChannelFuture.class);
+ });
+
lenient().when(recoveryDescriptorProvider.getRecoveryDescriptor(anyString(),
any(), anyShort()))
+ .thenReturn(recoveryDescriptor);
+
+ lenient().when(context.executor()).thenReturn(eventExecutor);
+ lenient().when(eventExecutor.inEventLoop()).thenReturn(true);
+
+ lenient().when(channel.writeAndFlush(any())).then(invocation -> {
+ DefaultChannelProgressivePromise future = new
DefaultChannelProgressivePromise(channel, eventExecutor);
+ future.setSuccess();
+ return future;
+ });
+ }
+
+ @Test
+ @Timeout(10)
+ void
terminatesCurrentHandshakeInClinchWhenOngoingHandshakeLosesDueToTieBreaking() {
+ UUID clientLaunchId = LOWER_ID;
+ UUID serverLaunchId = HIGHER_ID;
+
+ RecoveryServerHandshakeManager manager =
serverHandshakeManager(serverLaunchId);
+ CompletableFuture<NettySender> handshakeFuture =
manager.localHandshakeFuture();
+
+ recoveryDescriptor.acquire(context, new CompletableFuture<>());
+
+ manager.onMessage(handshakeStartResponseMessageFrom(clientLaunchId));
+
+ verify(channel, never()).close();
+ verify(channel, never()).close(any(ChannelPromise.class));
+
+ HandshakeException ex = assertWillThrowFast(handshakeFuture,
HandshakeException.class);
+ assertThat(ex.getMessage(), startsWith("Failed to acquire recovery
descriptor during handshake, it is held by: "));
+
+ verify(channel).writeAndFlush(sentMessageCaptor.capture());
+
+ OutNetworkObject outObject = sentMessageCaptor.getValue();
+ assertThat(outObject.shouldBeSavedForRecovery(), is(false));
+ assertThat(outObject.networkMessage(),
is(instanceOf(HandshakeRejectedMessage.class)));
+
+ HandshakeRejectedMessage rejectedMessage = (HandshakeRejectedMessage)
outObject.networkMessage();
+ assertThat(rejectedMessage.reason(),
is(HandshakeRejectionReason.CLINCH));
+ assertThat(
+ rejectedMessage.message(),
+ startsWith("Handshake clinch detected, this handshake will be
terminated, winning channel is ")
+ );
+ }
+
+ private RecoveryServerHandshakeManager serverHandshakeManager(UUID
launchId) {
+ RecoveryServerHandshakeManager manager = new
RecoveryServerHandshakeManager(
+ launchId,
+ SERVER_CONSISTENT_ID,
+ MESSAGE_FACTORY,
+ recoveryDescriptorProvider,
+ new AllIdsAreFresh(),
+ channelCreationListener,
+ new AtomicBoolean(false)
+ );
+
+ manager.onInit(context);
+
+ return manager;
+ }
+
+ private static HandshakeStartResponseMessage
handshakeStartResponseMessageFrom(UUID clientLaunchId) {
+ return MESSAGE_FACTORY.handshakeStartResponseMessage()
+ .launchId(clientLaunchId)
+ .consistentId(CLIENT_CONSISTENT_ID)
+ .connectionId(CONNECTION_INDEX)
+ .receivedCount(0)
+ .build();
+ }
+}