This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 527b7b27d5 IGNITE-19903 Fix recovery descriptor race condition (#2283)
527b7b27d5 is described below
commit 527b7b27d5cfeb705ff38a0f73fae0a6500ffcb7
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Jul 13 15:50:43 2023 +0400
IGNITE-19903 Fix recovery descriptor race condition (#2283)
---
.../ignite/internal/future/OrderingFuture.java | 63 +++++++++++++-
.../CompletableFutureCompletedMatcher.java | 61 ++++++++++++++
.../network/netty/ItConnectionManagerTest.java | 97 ++++++++++++++++++++++
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 37 +++++++--
...ion.java => ChannelAlreadyExistsException.java} | 27 +++---
.../network/handshake/HandshakeException.java | 4 +-
.../ChannelCreationListener.java} | 26 ++----
.../internal/network/netty/ConnectionManager.java | 50 +++++------
.../netty/DefaultRecoveryDescriptorProvider.java | 14 +---
.../network/netty/InboundRecoveryHandler.java | 8 ++
.../ignite/internal/network/netty/NettySender.java | 9 +-
.../ignite/internal/network/netty/NettyServer.java | 8 --
.../network/recovery/HandshakeTieBreaker.java | 40 +++++++++
.../recovery/RecoveryClientHandshakeManager.java | 92 +++++++++++++++-----
.../network/recovery/RecoveryDescriptor.java | 46 ++++++++++
.../recovery/RecoveryDescriptorProvider.java | 3 +-
.../recovery/RecoveryServerHandshakeManager.java | 96 ++++++++++++++++-----
.../ignite/network/DefaultMessagingService.java | 26 +++++-
.../internal/network/netty/NettyServerTest.java | 8 +-
.../network/netty/RecoveryHandshakeTest.java | 53 +++++++-----
.../network/DefaultMessagingServiceTest.java | 3 +-
.../placementdriver/leases/LeaseTracker.java | 1 -
22 files changed, 592 insertions(+), 180 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
index 6aa95c04ff..40b7821f07 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/future/OrderingFuture.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Function;
import org.jetbrains.annotations.Nullable;
@@ -299,8 +300,8 @@ public class OrderingFuture<T> {
private static <T> void acceptQuietly(BiConsumer<? super T, ? super
Throwable> action, T result, Throwable ex) {
try {
action.accept(result, ex);
- } catch (Exception e) {
- // ignore
+ } catch (Exception ignored) {
+ // No-op.
}
}
@@ -337,6 +338,41 @@ public class OrderingFuture<T> {
}
}
+ /**
+ * Adds a mapping function that gets executed as soon as this future gets
completed for any reason. The function will accept both result
+ * and exception and return a future with the result of the function's
execution.
+ *
+ * @param mapper The function to use to compute the value of the returned
OrderingFuture.
+ * @return The new OrderingFuture.
+ * @see CompletableFuture#handle(BiFunction)
+ */
+ public <U> OrderingFuture<U> handle(BiFunction<? super T, Throwable, ?
extends U> mapper) {
+ Handle<T, U> dependent = null;
+
+ while (true) {
+ State<T> prevState = state;
+
+ if (prevState.completionQueueProcessed()) {
+ try {
+ U mappingResult = mapper.apply(prevState.result,
prevState.exception);
+
+ return completedFuture(mappingResult);
+ } catch (Throwable t) {
+ return failedFuture(t);
+ }
+ }
+
+ if (dependent == null) {
+ dependent = new Handle<>(new OrderingFuture<>(), mapper);
+ }
+ State<T> newState = prevState.enqueueDependent(dependent);
+
+ if (replaceState(prevState, newState)) {
+ return dependent.resultFuture;
+ }
+ }
+ }
+
private static CompletionException wrapWithCompletionException(Throwable
ex) {
return ex instanceof CompletionException ? (CompletionException) ex :
new CompletionException(ex);
}
@@ -463,6 +499,25 @@ public class OrderingFuture<T> {
}
}
+ private static class Handle<T, U> implements DependentAction<T> {
+ private final OrderingFuture<U> resultFuture;
+ private final BiFunction<? super T, Throwable, ? extends U> action;
+
+ private Handle(OrderingFuture<U> resultFuture, BiFunction<? super T,
Throwable, ? extends U> action) {
+ this.resultFuture = resultFuture;
+ this.action = action;
+ }
+
+ @Override
+ public void onCompletion(@Nullable T result, @Nullable Throwable ex,
NotificationContext context) {
+ try {
+ resultFuture.complete(action.apply(result, ex));
+ } catch (Throwable t) {
+ resultFuture.completeExceptionally(t);
+ }
+ }
+ }
+
private static class ThenComposeToCompletable<T, U> implements
DependentAction<T>, BiConsumer<U, Throwable> {
private final CompletableFuture<U> resultFuture;
private final Function<? super T, ? extends CompletableFuture<U>>
mapper;
@@ -563,8 +618,8 @@ public class OrderingFuture<T> {
try {
node.dependent.onCompletion(result, exception, context);
- } catch (Exception e) {
- // ignore
+ } catch (Exception ignored) {
+ // No-op.
}
}
}
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
new file mode 100644
index 0000000000..c9047a5d67
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/CompletableFutureCompletedMatcher.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.util.concurrent.CompletableFuture;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher that tests if a CompletableFuture has completed successfully. */
+public class CompletableFutureCompletedMatcher<T> extends
TypeSafeMatcher<CompletableFuture<T>> {
+ private CompletableFutureCompletedMatcher() {
+ }
+
+ @Override
+ protected boolean matchesSafely(CompletableFuture<T> future) {
+ return future.isDone() && !future.isCompletedExceptionally() &&
!future.isCancelled();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("is a successfully completed
CompletableFuture");
+ }
+
+ @Override
+ public void describeMismatchSafely(CompletableFuture<T> item, Description
mismatchDescription) {
+ if (!item.isDone()) {
+ mismatchDescription.appendText("was not completed");
+ } else {
+ if (item.isCompletedExceptionally()) {
+ mismatchDescription.appendText("completed exceptionally");
+ } else if (item.isCancelled()) {
+ mismatchDescription.appendText("was cancelled");
+ } else {
+ // It might be successfully done now, but it wasn't at the
moment of matchesSafely execution.
+ mismatchDescription.appendText("was not completed");
+ }
+ }
+ }
+
+ /**
+ * Creates a {@link CompletableFutureCompletedMatcher}.
+ */
+ public static <T> CompletableFutureCompletedMatcher<T> completedFuture() {
+ return new CompletableFutureCompletedMatcher<T>();
+ }
+}
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9a50a77c68..7ea89fec58 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.network.netty;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -59,6 +63,7 @@ import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -295,6 +300,95 @@ public class ItConnectionManagerTest {
server.close();
}
+ /**
+ * Tests that if two nodes are opening channels to each other, only one
channel survives.
+ *
+ * @throws Exception If failed.
+ */
+ @RepeatedTest(100)
+ public void testOneChannelLeftIfConnectToEachOther() throws Exception {
+ try (
+ ConnectionManagerWrapper manager1 = startManager(4000);
+ ConnectionManagerWrapper manager2 = startManager(4001)
+ ) {
+ CompletableFuture<NettySender> fut1 =
manager1.openChannelTo(manager2).toCompletableFuture();
+ CompletableFuture<NettySender> fut2 =
manager2.openChannelTo(manager1).toCompletableFuture();
+
+ NettySender sender1 = null;
+ NettySender sender2 = null;
+
+ try {
+ sender1 = fut1.get(1, TimeUnit.SECONDS);
+ } catch (Exception ignored) {
+ // No-op.
+ }
+ try {
+ sender2 = fut2.get(1, TimeUnit.SECONDS);
+ } catch (Exception ignored) {
+ // No-op.
+ }
+
+ NettySender highlander = null;
+
+ assertTrue(sender1 != null || sender2 != null);
+
+ if (sender1 != null && sender1.isOpen()) {
+ highlander = sender1;
+
+ boolean sender2NullOrClosed = sender2 == null ||
!sender2.isOpen();
+
+ assertTrue(sender2NullOrClosed);
+ }
+
+ if (sender2 != null && sender2.isOpen()) {
+ highlander = sender2;
+
+ boolean sender1NullOrClosed = sender1 == null ||
!sender1.isOpen();
+
+ assertTrue(sender1NullOrClosed);
+ }
+
+ assertNotNull(highlander);
+ assertTrue(highlander.isOpen());
+
+ assertTrue(
+ waitForCondition(
+ () -> manager1.channels().size() == 1 &&
manager2.channels().size() == 1,
+ TimeUnit.SECONDS.toMillis(1)
+ )
+ );
+
+ CompletableFuture<NettySender> channelFut1 =
manager1.connectionManager.channel(
+ manager2.connectionManager.consistentId(),
+ ChannelType.DEFAULT,
+ manager2.connectionManager.localAddress()
+ ).toCompletableFuture();
+
+ CompletableFuture<NettySender> channelFut2 =
manager2.connectionManager.channel(
+ manager1.connectionManager.consistentId(),
+ ChannelType.DEFAULT,
+ manager1.connectionManager.localAddress()
+ ).toCompletableFuture();
+
+ assertThat(channelFut1, is(completedFuture()));
+ assertThat(channelFut2, is(completedFuture()));
+
+ NettySender channel1 = channelFut1.getNow(null);
+ NettySender channel2 = channelFut2.getNow(null);
+
+ InetSocketAddress locAddr1 = (InetSocketAddress)
channel1.channel().localAddress();
+ InetSocketAddress remoteAddr1 = (InetSocketAddress)
channel1.channel().remoteAddress();
+
+ InetSocketAddress locAddr2 = (InetSocketAddress)
channel2.channel().localAddress();
+ InetSocketAddress remoteAddr2 = (InetSocketAddress)
channel2.channel().remoteAddress();
+
+ // Only compare ports because hosts may look different, eg
localhost and 0.0.0.0. They are technically not same,
+ // although equal.
+ assertEquals(locAddr1.getPort(), remoteAddr2.getPort());
+ assertEquals(locAddr2.getPort(), remoteAddr1.getPort());
+ }
+ }
+
/**
* Creates a mock {@link MessageSerializationRegistry} that throws an
exception when trying to get a serializer or a deserializer.
*/
@@ -383,5 +477,8 @@ public class ItConnectionManagerTest {
);
}
+ public Map<ConnectorKey<String>, NettySender> channels() {
+ return connectionManager.channels();
+ }
}
}
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 6f5c970339..782725c95a 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.scalecube;
+import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -51,6 +52,7 @@ import
org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
@@ -355,25 +357,44 @@ class ItScaleCubeNetworkMessagingTest {
}
private void knockOutNode(String outcastName) throws InterruptedException {
- CountDownLatch disappeared = new CountDownLatch(1);
+ CountDownLatch disappeared = new CountDownLatch(2);
- testCluster.members.get(0).topologyService().addEventHandler(new
TopologyEventHandler() {
+ TopologyEventHandler disappearListener = new TopologyEventHandler() {
@Override
public void onDisappeared(ClusterNode member) {
if (Objects.equals(member.name(), outcastName)) {
disappeared.countDown();
}
}
- });
+ };
- testCluster.members.stream()
+ List<ClusterService> notOutcasts = testCluster.members.stream()
.filter(service -> !outcastName.equals(service.nodeName()))
- .forEach(service -> {
- DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
- messagingService.dropMessages((recipientConsistentId,
message) -> outcastName.equals(recipientConsistentId));
- });
+ .collect(toList());
+ notOutcasts.forEach(clusterService -> {
+
clusterService.topologyService().addEventHandler(disappearListener);
+ });
+
+ notOutcasts.forEach(service -> {
+ DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
+ messagingService.dropMessages((recipientConsistentId, message) ->
outcastName.equals(recipientConsistentId));
+ });
+
+ // Wait until all nodes see disappearance of the outcast.
assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not
disappear in time");
+
+ DefaultMessagingService messagingService = (DefaultMessagingService)
testCluster.members.stream()
+ .filter(service -> outcastName.equals(service.nodeName()))
+ .findFirst()
+ .get().messagingService();
+
+ ConnectionManager cm = messagingService.connectionManager();
+
+ // Forcefully close channels, so that nodes will create new channels
on reanimation of the outcast.
+ cm.channels().forEach((stringConnectorKey, nettySender) -> {
+ nettySender.close().awaitUninterruptibly();
+ });
}
private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String
outcastName) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
similarity index 61%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
index 197f68438b..96640e4506 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/ChannelAlreadyExistsException.java
@@ -18,25 +18,22 @@
package org.apache.ignite.internal.network.handshake;
/**
- * Handshake exception.
+ * Exception that notifies of existence of a channel with a specific
consistent id during handshake.
*/
-public class HandshakeException extends Exception {
- /**
- * Constructor.
- *
- * @param message Handshake error message.
- */
- public HandshakeException(String message) {
- super(message);
+public class ChannelAlreadyExistsException extends RuntimeException {
+ private static final long serialVersionUID = 0L;
+
+ /** Consistent id of a remote node. */
+ private final String consistentId;
+
+ public ChannelAlreadyExistsException(String consistentId) {
+ this.consistentId = consistentId;
}
/**
- * Constructor.
- *
- * @param message Handshake error message.
- * @param cause Handshake error cause.
+ * Returns consistent id of the remote node with which a channel already
exists.
*/
- public HandshakeException(String message, Throwable cause) {
- super(message, cause);
+ public String consistentId() {
+ return consistentId;
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
index 197f68438b..39b8fe02da 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.network.handshake;
/**
* Handshake exception.
*/
-public class HandshakeException extends Exception {
+public class HandshakeException extends RuntimeException {
+ private static final long serialVersionUID = 0L;
+
/**
* Constructor.
*
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
similarity index 61%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
index 197f68438b..efdab3ba24 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeException.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ChannelCreationListener.java
@@ -15,28 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.handshake;
-
-/**
- * Handshake exception.
- */
-public class HandshakeException extends Exception {
- /**
- * Constructor.
- *
- * @param message Handshake error message.
- */
- public HandshakeException(String message) {
- super(message);
- }
+package org.apache.ignite.internal.network.netty;
+/** Channel creation listener. */
+public interface ChannelCreationListener {
/**
- * Constructor.
+ * Notifies of the handshake's finish.
*
- * @param message Handshake error message.
- * @param cause Handshake error cause.
+ * @param channel Opened channel.
*/
- public HandshakeException(String message, Throwable cause) {
- super(message, cause);
- }
+ void handshakeFinished(NettySender channel);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 371c295e48..11988c6b4c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -54,7 +54,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Class that manages connections both incoming and outgoing.
*/
-public class ConnectionManager {
+public class ConnectionManager implements ChannelCreationListener {
/** Message factory. */
private static final NetworkMessagesFactory FACTORY = new
NetworkMessagesFactory();
@@ -92,7 +92,7 @@ public class ConnectionManager {
private final StaleIdDetector staleIdDetector;
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
- private final RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory;
+ private final @Nullable RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory;
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -131,7 +131,7 @@ public class ConnectionManager {
consistentId,
bootstrapFactory,
staleIdDetector,
- new
DefaultRecoveryClientHandshakeManagerFactory(staleIdDetector)
+ null
);
}
@@ -153,7 +153,7 @@ public class ConnectionManager {
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
- RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
+ @Nullable RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory
) {
this.serializationService = serializationService;
this.launchId = launchId;
@@ -165,7 +165,6 @@ public class ConnectionManager {
this.server = new NettyServer(
networkConfiguration,
this::createServerHandshakeManager,
- this::onNewIncomingChannel,
this::onMessage,
serializationService,
bootstrapFactory
@@ -221,6 +220,7 @@ public class ConnectionManager {
* @return Sender.
*/
public OrderingFuture<NettySender> channel(@Nullable String consistentId,
ChannelType type, InetSocketAddress address) {
+ // Problem is we can't look up a channel by consistent id because
consistent id is not known yet.
if (consistentId != null) {
// If consistent id is known, try looking up a channel by
consistent id. There can be an outbound connection
// or an inbound connection associated with that consistent id.
@@ -263,13 +263,12 @@ public class ConnectionManager {
*
* @param channel Channel from client to this {@link #server}.
*/
- private void onNewIncomingChannel(NettySender channel) {
+ @Override
+ public void handshakeFinished(NettySender channel) {
ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(),
getChannel(channel.channelId()));
NettySender oldChannel = channels.put(key, channel);
- if (oldChannel != null) {
- oldChannel.close();
- }
+ assert oldChannel == null : "Incorrect channel creation flow";
}
/**
@@ -288,10 +287,7 @@ public class ConnectionManager {
);
client.start(clientBootstrap).whenComplete((sender, throwable) -> {
- if (throwable == null) {
- ConnectorKey<String> key = new
ConnectorKey<>(sender.consistentId(), getChannel(sender.channelId()));
- channels.put(key, sender);
- } else {
+ if (throwable != null) {
clients.remove(new ConnectorKey<>(address, channelType));
}
});
@@ -344,6 +340,10 @@ public class ConnectionManager {
}
private HandshakeManager createClientHandshakeManager(short connectionId) {
+ if (clientHandshakeManagerFactory == null) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId,
connectionId, descriptorProvider, staleIdDetector, this);
+ }
+
return clientHandshakeManagerFactory.create(
launchId,
consistentId,
@@ -353,7 +353,7 @@ public class ConnectionManager {
}
private HandshakeManager createServerHandshakeManager() {
- return new RecoveryServerHandshakeManager(launchId, consistentId,
FACTORY, descriptorProvider, staleIdDetector);
+ return new RecoveryServerHandshakeManager(launchId, consistentId,
FACTORY, descriptorProvider, staleIdDetector, this);
}
/**
@@ -391,22 +391,12 @@ public class ConnectionManager {
}
/**
- * Factory producing vanilla {@link RecoveryClientHandshakeManager}
instances.
+ * Returns collection of all channels of this connection manager.
+ *
+ * @return Collection of all channels of this connection manager.
*/
- private static class DefaultRecoveryClientHandshakeManagerFactory
implements RecoveryClientHandshakeManagerFactory {
- private final StaleIdDetector staleIdDetector;
-
- private DefaultRecoveryClientHandshakeManagerFactory(StaleIdDetector
staleIdDetector) {
- this.staleIdDetector = staleIdDetector;
- }
-
- @Override
- public RecoveryClientHandshakeManager create(UUID launchId,
- String consistentId,
- short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider
- ) {
- return new RecoveryClientHandshakeManager(launchId, consistentId,
connectionId, recoveryDescriptorProvider, staleIdDetector);
- }
+ @TestOnly
+ public Map<ConnectorKey<String>, NettySender> channels() {
+ return Map.copyOf(channels);
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
index e8ffa5b042..89a8f3b8ff 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -36,8 +36,8 @@ public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProv
/** {@inheritDoc} */
@Override
- public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex, boolean inbound) {
- var key = new ChannelKey(consistentId, launchId, connectionIndex,
inbound);
+ public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex) {
+ var key = new ChannelKey(consistentId, launchId, connectionIndex);
return recoveryDescriptors.computeIfAbsent(key, channelKey -> new
RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
}
@@ -56,14 +56,10 @@ public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProv
*/
private final short connectionId;
- /** {@code true} if channel is inbound, {@code false} otherwise. */
- private final boolean inbound;
-
- private ChannelKey(String consistentId, UUID launchId, short
connectionId, boolean inbound) {
+ private ChannelKey(String consistentId, UUID launchId, short
connectionId) {
this.consistentId = consistentId;
this.launchId = launchId;
this.connectionId = connectionId;
- this.inbound = inbound;
}
/** {@inheritDoc} */
@@ -81,9 +77,6 @@ public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProv
if (connectionId != that.connectionId) {
return false;
}
- if (inbound != that.inbound) {
- return false;
- }
if (!consistentId.equals(that.consistentId)) {
return false;
}
@@ -96,7 +89,6 @@ public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProv
int result = consistentId.hashCode();
result = 31 * result + launchId.hashCode();
result = 31 * result + connectionId;
- result = 31 * result + (inbound ? 1 : 0);
return result;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
index 348bde848c..92d50d1749 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.NotNull;
/**
* Inbound handler that handles incoming acknowledgement messages and sends
acknowledgement messages for other messages.
@@ -69,4 +70,11 @@ public class InboundRecoveryHandler extends
ChannelInboundHandlerAdapter {
super.channelRead(ctx, message);
}
+
+ @Override
+ public void channelInactive(@NotNull ChannelHandlerContext ctx) throws
Exception {
+ descriptor.release(ctx);
+
+ super.channelInactive(ctx);
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 2294646d60..f6415cd4c8 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.netty;
import static
org.apache.ignite.internal.network.netty.NettyUtils.toCompletableFuture;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.handler.stream.ChunkedInput;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
@@ -94,10 +95,12 @@ public class NettySender {
}
/**
- * Closes channel.
+ * Closes channel and returns the {@link Channel#closeFuture()}.
+ *
+ * @return {@link Channel#closeFuture()} of the {@link #channel}.
*/
- public void close() {
- this.channel.close().awaitUninterruptibly();
+ public ChannelFuture close() {
+ return this.channel.close();
}
/**
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 7242c25725..ba2d5ff73f 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -73,9 +73,6 @@ public class NettyServer {
@Nullable
private CompletableFuture<Void> serverCloseFuture;
- /** New connections listener. */
- private final Consumer<NettySender> newConnectionListener;
-
/** Flag indicating if {@link #stop()} has been called. */
private boolean stopped;
@@ -84,7 +81,6 @@ public class NettyServer {
*
* @param configuration Server configuration.
* @param handshakeManager Handshake manager supplier.
- * @param newConnectionListener New connections listener.
* @param messageListener Message listener.
* @param serializationService Serialization service.
* @param bootstrapFactory Netty bootstrap factory.
@@ -92,14 +88,12 @@ public class NettyServer {
public NettyServer(
NetworkView configuration,
Supplier<HandshakeManager> handshakeManager,
- Consumer<NettySender> newConnectionListener,
Consumer<InNetworkObject> messageListener,
SerializationService serializationService,
NettyBootstrapFactory bootstrapFactory
) {
this.configuration = configuration;
this.handshakeManager = handshakeManager;
- this.newConnectionListener = newConnectionListener;
this.messageListener = messageListener;
this.serializationService = serializationService;
this.bootstrapFactory = bootstrapFactory;
@@ -137,8 +131,6 @@ public class NettyServer {
} else {
PipelineUtils.setup(ch.pipeline(),
sessionSerializationService, manager, messageListener);
}
-
-
manager.handshakeFuture().thenAccept(newConnectionListener);
}
});
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
new file mode 100644
index 0000000000..e3bf6018ef
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/HandshakeTieBreaker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.UUID;
+
+/**
+ * The HandshakeTieBreaker class provides a mechanism for determining whether
an existing channel should be closed during a handshake
+ * process.
+ */
+class HandshakeTieBreaker {
+ /**
+ * Determines whether an existing channel should be closed based on the
comparison of the server's launch id and the client's launch id.
+ * If the client's launch id is greater than the server's launch id, the
existing channel should be closed in favor of the new one. If
+ * the server's launch id is less than or equal to the client's launch id,
the existing channel should be closed in favor of the new
+ * one.
+ *
+ * @param serverLaunchId Server's launch id.
+ * @param clientLaunchId Client's launch id.
+ * @return {@code true} if an existing channel should be closed, {@code
false} otherwise.
+ */
+ static boolean shouldCloseChannel(UUID serverLaunchId, UUID
clientLaunchId) {
+ return clientLaunchId.compareTo(serverLaunchId) > 0;
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index cefc0a16a1..53a00335a9 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -18,19 +18,21 @@
package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
+import static
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
@@ -104,13 +106,34 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
String consistentId,
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider,
- StaleIdDetector staleIdDetector
+ StaleIdDetector staleIdDetector,
+ ChannelCreationListener channelCreationListener
) {
this.launchId = launchId;
this.consistentId = consistentId;
this.connectionId = connectionId;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.staleIdDetector = staleIdDetector;
+
+ this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+ if (throwable != null) {
+ releaseResources();
+
+ return;
+ }
+
+ channelCreationListener.handshakeFinished(nettySender);
+ });
+ }
+
+ private void releaseResources() {
+ assert ctx.executor().inEventLoop() : "Release resources called
outside of event loop";
+
+ RecoveryDescriptor desc = recoveryDescriptor;
+
+ if (desc != null) {
+ desc.release(ctx);
+ }
}
/** {@inheritDoc} */
@@ -125,25 +148,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
@Override
public void onMessage(NetworkMessage message) {
if (message instanceof HandshakeStartMessage) {
- HandshakeStartMessage msg = (HandshakeStartMessage) message;
-
- if (staleIdDetector.isIdStale(msg.launchId().toString())) {
- handleStaleServerId(msg);
-
- return;
- }
-
- this.remoteLaunchId = msg.launchId();
- this.remoteConsistentId = msg.consistentId();
-
- this.recoveryDescriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
- remoteConsistentId,
- remoteLaunchId,
- connectionId,
- false
- );
-
- handshake(recoveryDescriptor);
+ onHandshakeStartMessage((HandshakeStartMessage) message);
return;
}
@@ -197,8 +202,49 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
ctx.fireChannelRead(message);
}
+ private void onHandshakeStartMessage(HandshakeStartMessage message) {
+ if (staleIdDetector.isIdStale(message.launchId().toString())) {
+ handleStaleServerId(message);
+
+ return;
+ }
+
+ this.remoteLaunchId = message.launchId();
+ this.remoteConsistentId = message.consistentId();
+
+ RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ connectionId
+ );
+
+ while (!descriptor.acquire(ctx)) {
+ if (shouldCloseChannel(remoteLaunchId, launchId)) {
+ Channel holderChannel = descriptor.holderChannel();
+
+ if (holderChannel == null) {
+ continue;
+ }
+
+ holderChannel.close().awaitUninterruptibly();
+ } else {
+ String err = "Failed to acquire recovery descriptor during
handshake, it is held by: " + descriptor.holderDescription();
+
+ LOG.info(err);
+
+ handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+
+ return;
+ }
+ }
+
+ this.recoveryDescriptor = descriptor;
+
+ handshake(this.recoveryDescriptor);
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.launchId() + " is stale, server should be
restarted so that clients can connect";
+ String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
.reason(reason)
.build();
@@ -232,7 +278,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
.connectionId(connectionId)
.build();
- ChannelFuture sendFuture = ctx.channel().writeAndFlush(new
OutNetworkObject(response, Collections.emptyList(), false));
+ ChannelFuture sendFuture = ctx.channel().writeAndFlush(new
OutNetworkObject(response, emptyList(), false));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
if (throwable != null) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
index 64e160183c..463f2ed49c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -17,12 +17,16 @@
package org.apache.ignite.internal.network.recovery;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.Nullable;
/**
* Recovery protocol descriptor.
@@ -40,6 +44,9 @@ public class RecoveryDescriptor {
/** Count of received messages. */
private long receivedCount;
+ /** Current owner channel of this descriptor. */
+ private final AtomicReference<Channel> channelHolder = new
AtomicReference<>();
+
/**
* Constructor.
*
@@ -124,4 +131,43 @@ public class RecoveryDescriptor {
public String toString() {
return S.toString(RecoveryDescriptor.class, this);
}
+
+ /**
+ * Release this descriptor.
+ *
+ * @param ctx Channel handler context.
+ */
+ public void release(ChannelHandlerContext ctx) {
+ channelHolder.compareAndSet(ctx.channel(), null);
+ }
+
+ /**
+ * Acquire this descriptor.
+ *
+ * @param ctx Channel handler context.
+ */
+ public boolean acquire(ChannelHandlerContext ctx) {
+ return channelHolder.compareAndSet(null, ctx.channel());
+ }
+
+ /**
+ * Returns the channel, that holds this descriptor.
+ */
+ @Nullable Channel holderChannel() {
+ return channelHolder.get();
+ }
+
+ /**
+ * Returns {@code toString()} representation of a {@link Channel}, that
holds this descriptor.
+ */
+ String holderDescription() {
+ Channel channel = channelHolder.get();
+
+ if (channel == null) {
+ // This can happen if channel was already closed and it released
the descriptor.
+ return "No channel";
+ }
+
+ return channel.toString();
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
index b46a29ba2b..5603e18009 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
@@ -29,8 +29,7 @@ public interface RecoveryDescriptorProvider {
* @param consistentId Remote node consistent id.
* @param launchId Remote node launch id.
* @param connectionIndex Connection id.
- * @param inbound {@code true} if the connection is inbound, {@code false}
otherwise.
* @return Recovery descriptor.
*/
- RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex, boolean inbound);
+ RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex);
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 395de002b7..234f508dd0 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.recovery;
import static java.util.Collections.emptyList;
+import static
org.apache.ignite.internal.network.recovery.HandshakeTieBreaker.shouldCloseChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.ChannelCreationListener;
import org.apache.ignite.internal.network.netty.HandshakeHandler;
import org.apache.ignite.internal.network.netty.MessageHandler;
import org.apache.ignite.internal.network.netty.NettySender;
@@ -106,13 +108,34 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
String consistentId,
NetworkMessagesFactory messageFactory,
RecoveryDescriptorProvider recoveryDescriptorProvider,
- StaleIdDetector staleIdDetector
+ StaleIdDetector staleIdDetector,
+ ChannelCreationListener channelCreationListener
) {
this.launchId = launchId;
this.consistentId = consistentId;
this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.staleIdDetector = staleIdDetector;
+
+ this.handshakeCompleteFuture.whenComplete((nettySender, throwable) -> {
+ if (throwable != null) {
+ releaseResources();
+
+ return;
+ }
+
+ channelCreationListener.handshakeFinished(nettySender);
+ });
+ }
+
+ private void releaseResources() {
+ assert ctx.executor().inEventLoop() : "Release resources called
outside of event loop";
+
+ RecoveryDescriptor desc = recoveryDescriptor;
+
+ if (desc != null) {
+ desc.release(ctx);
+ }
}
/** {@inheritDoc} */
@@ -146,26 +169,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
@Override
public void onMessage(NetworkMessage message) {
if (message instanceof HandshakeStartResponseMessage) {
- HandshakeStartResponseMessage msg =
(HandshakeStartResponseMessage) message;
-
- if (staleIdDetector.isIdStale(msg.launchId().toString())) {
- handleStaleClientId(msg);
-
- return;
- }
-
- this.remoteLaunchId = msg.launchId();
- this.remoteConsistentId = msg.consistentId();
- this.receivedCount = msg.receivedCount();
- this.remoteChannelId = msg.connectionId();
-
- this.recoveryDescriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
- remoteConsistentId,
- remoteLaunchId,
- remoteChannelId,
- true);
-
- handshake(recoveryDescriptor);
+ onHandshakeStartResponseMessage((HandshakeStartResponseMessage)
message);
return;
}
@@ -192,8 +196,56 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
ctx.fireChannelRead(message);
}
+ private void onHandshakeStartResponseMessage(HandshakeStartResponseMessage
message) {
+ UUID remoteLaunchId = message.launchId();
+ String remoteConsistentId = message.consistentId();
+ long remoteReceivedCount = message.receivedCount();
+ short remoteChannelId = message.connectionId();
+
+ if (staleIdDetector.isIdStale(remoteLaunchId.toString())) {
+ handleStaleClientId(message);
+
+ return;
+ }
+
+ this.remoteLaunchId = remoteLaunchId;
+ this.remoteConsistentId = remoteConsistentId;
+ this.receivedCount = remoteReceivedCount;
+ this.remoteChannelId = remoteChannelId;
+
+ RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
+ this.remoteConsistentId,
+ this.remoteLaunchId,
+ this.remoteChannelId
+ );
+
+ while (!descriptor.acquire(ctx)) {
+ if (shouldCloseChannel(launchId, remoteLaunchId)) {
+ Channel holderChannel = descriptor.holderChannel();
+
+ if (holderChannel == null) {
+ continue;
+ }
+
+ holderChannel.close().awaitUninterruptibly();
+ } else {
+ String err = "Failed to acquire recovery descriptor during
handshake, it is held by: " + descriptor.holderDescription();
+
+ LOG.info(err);
+
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(err));
+
+ return;
+ }
+ }
+
+ this.recoveryDescriptor = descriptor;
+
+ handshake(descriptor);
+ }
+
private void handleStaleClientId(HandshakeStartResponseMessage msg) {
- String reason = msg.launchId() + " is stale, client should be
restarted to be allowed to connect";
+ String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, client should be restarted to be allowed to connect";
HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
.reason(reason)
.build();
diff --git
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 7ca5811863..d71b8d641b 100644
---
a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++
b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -28,6 +28,7 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -40,6 +41,7 @@ import org.apache.ignite.internal.future.OrderingFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
@@ -282,8 +284,23 @@ public class DefaultMessagingService extends
AbstractMessagingService {
}
OrderingFuture<NettySender> channel =
connectionManager.channel(consistentId, type, addr);
- return channel
- .thenComposeToCompletable(sender -> sender.send(new
OutNetworkObject(message, descriptors)));
+
+ return channel.handle((sender, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CompletionException &&
throwable.getCause() instanceof ChannelAlreadyExistsException) {
+ ChannelAlreadyExistsException e =
(ChannelAlreadyExistsException) throwable.getCause();
+
+ OrderingFuture<NettySender> channelFut =
connectionManager.channel(e.consistentId(), type, addr);
+
+ return channelFut.thenComposeToCompletable(nettySender -> {
+ return nettySender.send(new OutNetworkObject(message,
descriptors));
+ });
+ }
+
+ throw new CompletionException(throwable);
+ }
+ return sender.send(new OutNetworkObject(message, descriptors));
+ }).thenComposeToCompletable(Function.identity());
}
private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws
Exception {
@@ -490,4 +507,9 @@ public class DefaultMessagingService extends
AbstractMessagingService {
public void stopDroppingMessages() {
dropMessagesPredicate = null;
}
+
+ @TestOnly
+ public ConnectionManager connectionManager() {
+ return connectionManager;
+ }
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index bfad033576..ff9888edea 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -179,10 +179,7 @@ public class NettyServerTest {
server = new NettyServer(
serverCfg.value(),
() -> handshakeManager,
- sender -> {
- },
- (message) -> {
- },
+ (message) -> {},
new SerializationService(registry,
mock(UserObjectSerializationContext.class)),
bootstrapFactory
);
@@ -219,8 +216,8 @@ public class NettyServerTest {
InOrder order = Mockito.inOrder(handshakeManager);
order.verify(handshakeManager, timeout()).onInit(any());
- order.verify(handshakeManager, timeout()).handshakeFuture();
order.verify(handshakeManager, timeout()).onConnectionOpen();
+ order.verify(handshakeManager, timeout()).handshakeFuture();
order.verify(handshakeManager, timeout()).onMessage(any());
}
@@ -248,7 +245,6 @@ public class NettyServerTest {
() -> mock(HandshakeManager.class),
null,
null,
- null,
bootstrapFactory
);
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 31eabb3539..a83aa0975c 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -21,7 +21,6 @@ import static
org.apache.ignite.utils.ClusterServiceTestUtils.defaultSerializati
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -109,7 +108,7 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider serverRecovery =
createRecoveryDescriptorProvider();
UUID clientLaunchId = UUID.randomUUID();
- RecoveryDescriptor serverRecoveryDescriptor =
serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID,
true);
+ RecoveryDescriptor serverRecoveryDescriptor =
serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID);
addUnacknowledgedMessages(serverRecoveryDescriptor);
RecoveryClientHandshakeManager clientHandshakeManager =
createRecoveryClientHandshakeManager("client", clientLaunchId,
@@ -162,7 +161,7 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider serverRecovery =
createRecoveryDescriptorProvider();
UUID serverLaunchId = UUID.randomUUID();
- RecoveryDescriptor clientRecoveryDescriptor =
clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID,
false);
+ RecoveryDescriptor clientRecoveryDescriptor =
clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID);
addUnacknowledgedMessages(clientRecoveryDescriptor);
RecoveryClientHandshakeManager clientHandshakeManager =
createRecoveryClientHandshakeManager(clientRecovery);
@@ -214,8 +213,8 @@ public class RecoveryHandshakeTest {
RecoveryDescriptorProvider node1Recovery =
createRecoveryDescriptorProvider();
RecoveryDescriptorProvider node2Recovery =
createRecoveryDescriptorProvider();
- UUID node1Uuid = UUID.randomUUID();
- UUID node2Uuid = UUID.randomUUID();
+ UUID node1Uuid = new UUID(100, 200);
+ UUID node2Uuid = new UUID(300, 400);
RecoveryClientHandshakeManager chm1 =
createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
RecoveryServerHandshakeManager shm1 =
createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
@@ -223,27 +222,35 @@ public class RecoveryHandshakeTest {
RecoveryClientHandshakeManager chm2 =
createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
RecoveryServerHandshakeManager shm2 =
createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
- EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
- EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
- EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
- EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+ // Channel opened from node1 to node2 - channel 1.
+ // Channel opened from node2 to node1 - channel 2.
- exchangeServerToClient(in1to2, out2to1);
- exchangeServerToClient(in2to1, out1to2);
+ // Channel 1.
+ EmbeddedChannel channel1Src = setupChannel(chm1, noMessageListener);
+ EmbeddedChannel channel1Dst = setupChannel(shm2, noMessageListener);
- exchangeClientToServer(in1to2, out2to1);
- exchangeClientToServer(in2to1, out1to2);
+ // Channel 1.
+ EmbeddedChannel channel2Src = setupChannel(chm2, noMessageListener);
+ EmbeddedChannel channel2Dst = setupChannel(shm1, noMessageListener);
- exchangeServerToClient(in1to2, out2to1);
- exchangeServerToClient(in2to1, out1to2);
+ exchangeServerToClient(channel2Dst, channel2Src);
+ exchangeServerToClient(channel1Dst, channel1Src);
- assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
- assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+ exchangeClientToServer(channel2Dst, channel2Src);
+ exchangeClientToServer(channel1Dst, channel1Src);
- assertFalse(out1to2.finish());
- assertFalse(in1to2.finish());
- assertFalse(out2to1.finish());
- assertFalse(in2to1.finish());
+ // 2 -> 1 is alive, while 1 -> 2 closes because of the tie-breaking.
+ exchangeServerToClient(channel2Dst, channel2Src);
+ assertFalse(channel1Src.isOpen());
+ assertFalse(channel1Dst.isOpen());
+
+ assertTrue(channel2Src.isOpen());
+ assertTrue(channel2Dst.isOpen());
+
+ assertFalse(channel1Src.finish());
+ assertFalse(channel2Dst.finish());
+ assertFalse(channel2Src.finish());
+ assertFalse(channel1Dst.finish());
}
@Test
@@ -534,7 +541,7 @@ public class RecoveryHandshakeTest {
private RecoveryClientHandshakeManager
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider, StaleIdDetector
staleIdDetector) {
- return new RecoveryClientHandshakeManager(launchId, consistentId,
CONNECTION_ID, provider, staleIdDetector);
+ return new RecoveryClientHandshakeManager(launchId, consistentId,
CONNECTION_ID, provider, staleIdDetector, channel -> {});
}
private RecoveryServerHandshakeManager
createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -548,7 +555,7 @@ public class RecoveryHandshakeTest {
private RecoveryServerHandshakeManager
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider, StaleIdDetector
staleIdDetector) {
- return new RecoveryServerHandshakeManager(launchId, consistentId,
MESSAGE_FACTORY, provider, staleIdDetector);
+ return new RecoveryServerHandshakeManager(launchId, consistentId,
MESSAGE_FACTORY, provider, staleIdDetector, channel -> {});
}
private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 7a2327617a..cb1736b073 100644
---
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -325,7 +325,8 @@ class DefaultMessagingServiceTest {
consistentId,
connectionId,
recoveryDescriptorProvider,
- staleIdDetector
+ staleIdDetector,
+ channel -> {}
) {
@Override
protected void finishHandshake() {
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ab5091934d..a3a707c84a 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -243,7 +243,6 @@ public class LeaseTracker implements PlacementDriver {
return completedFuture(lease);
}
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19532 Race
between meta storage safe time publication and listeners.
return
msManager.clusterTime().waitFor(timestamp).thenApply(ignored -> inBusyLock(
busyLock, () -> {
Lease lease0 =
leasesMap.getOrDefault(replicationGroupId, EMPTY_LEASE);