This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d873ebabc6 IGNITE-18712 Do not allow a node excluded from Physical
Topology to enter the topology again (#1978)
d873ebabc6 is described below
commit d873ebabc67a4b98c7474fc37303432009ebe68a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Sun Apr 30 13:51:09 2023 +0400
IGNITE-18712 Do not allow a node excluded from Physical Topology to enter
the topology again (#1978)
---
build.gradle | 5 +
gradle/libs.versions.toml | 6 +-
.../org/apache/ignite/network/ClusterNode.java | 5 +-
.../management/ClusterManagementGroupManager.java | 30 +----
.../impl/ItMetaStorageMultipleNodesTest.java | 33 ------
modules/network/build.gradle | 1 +
.../network/netty/ItConnectionManagerTest.java | 4 +-
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 100 ++++++++++++++++-
.../internal/network/NetworkMessageTypes.java | 14 ++-
.../internal/network/netty/ConnectionManager.java | 24 +++-
.../netty/DefaultRecoveryDescriptorProvider.java | 2 +-
.../internal/network/netty/HandshakeHandler.java | 8 +-
.../internal/network/recovery/FailureHandler.java | 33 ++++++
.../network/recovery/InMemoryStaleIds.java | 39 +++++++
.../recovery/RecoveryClientHandshakeManager.java | 56 +++++++++-
.../recovery/RecoveryServerHandshakeManager.java | 60 +++++++++-
.../internal/network/recovery/StaleIdDetector.java | 33 ++++++
.../ignite/internal/network/recovery/StaleIds.java | 32 ++++++
.../internal/network/recovery/VaultStateIds.java | 104 +++++++++++++++++
.../recovery/message/HandshakeRejectedMessage.java | 38 +++++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 23 +++-
.../network/netty/RecoveryHandshakeTest.java | 98 +++++++++++++++-
.../network/recovery/VaultStateIdsTest.java | 124 +++++++++++++++++++++
.../network/DefaultMessagingServiceTest.java | 25 ++++-
.../internal/network/recovery/AllIdsAreFresh.java | 28 +++++
.../internal/network/recovery/AllIdsAreStale.java | 28 +++++
.../TestScaleCubeClusterServiceFactory.java | 1 -
.../ignite/utils/ClusterServiceTestUtils.java | 34 +++++-
.../raft/ItTruncateSuffixAndRestartTest.java | 4 +-
.../internal/compute/ItLogicalTopologyTest.java | 5 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 4 +-
...ItTxDistributedTestThreeNodesThreeReplicas.java | 11 +-
33 files changed, 907 insertions(+), 109 deletions(-)
diff --git a/build.gradle b/build.gradle
index 30c5f9166f..0b18987366 100644
--- a/build.gradle
+++ b/build.gradle
@@ -119,6 +119,11 @@ subprojects {
releasesOnly()
}
}
+
+ maven {
+ // TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+ url "https://jitpack.io"
+ }
}
tasks.register('printSubDependencies', DependencyReportTask)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index d4ed85141c..aab1b13919 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -50,7 +50,8 @@ typesafe = "1.4.1"
hamcrest = "2.2"
hamcrestOptional = "2.0.0"
hamcrestPath = "1.0.1"
-scalecube = "2.6.12"
+# TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+scalecube = "2.6.12-patch5"
calcite = "1.34.0"
value = "2.8.8"
janino = "3.1.9"
@@ -194,7 +195,8 @@ spoon-core = { module = "fr.inria.gforge.spoon:spoon-core",
version.ref = "spoon
fastutil-core = { module = "it.unimi.dsi:fastutil-core", version.ref =
"fastutil" }
-scalecube-cluster = { module = "io.scalecube:scalecube-cluster", version.ref =
"scalecube" }
+# TODO: IGNITE-19386 - switch to io.scalecube:scalecube-cluster
+scalecube-cluster = { module =
"com.github.rpuch.scalecube-cluster-parent:scalecube-cluster", version.ref =
"scalecube" }
kryo = { module = "com.esotericsoftware:kryo", version.ref = "kryo" }
diff --git
a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index 4d4019132a..dcbe92017f 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -61,10 +61,7 @@ public class ClusterNode implements Serializable {
* @param address Node address.
*/
public ClusterNode(String id, String name, NetworkAddress address) {
- this.id = id;
- this.name = name;
- this.address = address;
- this.nodeMetadata = null;
+ this(id, name, address, null);
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 155b100a17..93a4bf7352 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -138,20 +138,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
/** Node's attributes configuration. */
private final NodeAttributesConfiguration nodeAttributes;
- /**
- * Whether we attempted to complete join (i.e. send JoinReady command) on
Ignite node start.
- *
- * <p>Such join completion always happens during a start, and it is always
the last step during the startup process,
- * to make sure a node joins the cluster when it's fully ready.
- *
- * <p>We need this flag to make sure we handle automatic rejoins
correctly. If a short network hiccup happens, CMG leader
- * might lose our node of sight, hence the node will be removed from
physical and then from logical topologies. When the network
- * connectivity is restored, the node will appear in the physical
topology, after which it will try to rejoin the cluster. If such
- * 'rejoin' was carried out unconditionally, it could happen before the
first join during startup, so a not-yet-ready node could join
- * the cluster.
- */
- private volatile boolean attemptedCompleteJoinOnStart = false;
-
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
@@ -522,7 +508,7 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
if (service != null &&
service.nodeNames().equals(state.cmgNodes())) {
LOG.info("ClusterStateMessage received, but
the CMG service is already started");
- return joinCluster(service,
state.clusterTag());
+ return completedFuture(service);
}
if (service == null) {
@@ -550,21 +536,11 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
return initCmgRaftService(state);
})
- .thenCompose(Function.identity())
- .thenCompose(this::completeJoinIfTryingToRejoin);
+ .thenCompose(Function.identity());
}
}
}
- private CompletableFuture<CmgRaftService>
completeJoinIfTryingToRejoin(CmgRaftService cmgRaftService) {
- if (attemptedCompleteJoinOnStart) {
- return cmgRaftService.completeJoinCluster(mapNodeAttributes())
- .thenApply(unused -> cmgRaftService);
- } else {
- return completedFuture(cmgRaftService);
- }
- }
-
private CompletableFuture<CmgRaftService> joinCluster(CmgRaftService
service, ClusterTag clusterTag) {
return service.startJoinCluster(clusterTag, mapNodeAttributes())
.thenApply(v -> service)
@@ -860,8 +836,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
return failedFuture(new NodeStoppingException());
}
- attemptedCompleteJoinOnStart = true;
-
try {
return raftServiceAfterJoin().thenCompose(svc ->
svc.completeJoinCluster(mapNodeAttributes()));
} finally {
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 41c9953828..a094d4f880 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -61,7 +61,6 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -355,38 +354,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
assertTrue(waitForCondition(() ->
firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
}
- /**
- * Tests a scenario when a node gets kicked out of the Logical Topology
due to a network partition. It should then be able to join
- * the Meta Storage Raft group successfully.
- */
- @Test
- void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo)
throws Exception {
- Node firstNode = startNode(testInfo);
- Node secondNode = startNode(testInfo);
-
- firstNode.cmgManager.initCluster(List.of(firstNode.name()),
List.of(firstNode.name()), "test");
-
- assertThat(allOf(firstNode.cmgManager.onJoinReady(),
secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
-
- CompletableFuture<Set<String>> logicalTopologyNodes =
firstNode.cmgManager
- .logicalTopology()
- .thenApply(logicalTopology ->
logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
-
- assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(),
secondNode.name())));
-
- assertTrue(waitForCondition(() ->
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())),
10_000));
-
- // Make first node lose the second node from the Physical and Logical
topologies.
- firstNode.startDroppingMessagesTo(secondNode, ScaleCubeMessage.class);
-
- assertTrue(waitForCondition(() ->
firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
-
- // Make the first node discover the second node again. The second node
should be added as a Meta Storage Learner again.
- firstNode.stopDroppingMessages();
-
- assertTrue(waitForCondition(() ->
firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())),
10_000));
- }
-
/**
* Tests that safe time is propagated from the leader to the
follower/learner.
*/
diff --git a/modules/network/build.gradle b/modules/network/build.gradle
index 27f5816aa5..2a1094850b 100644
--- a/modules/network/build.gradle
+++ b/modules/network/build.gradle
@@ -29,6 +29,7 @@ dependencies {
implementation project(':ignite-configuration-api')
implementation project(':ignite-core')
+ implementation project(':ignite-vault')
implementation libs.jetbrains.annotations
implementation libs.scalecube.cluster
implementation libs.fastutil.core
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index bfd7eace45..9a50a77c68 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
import org.apache.ignite.internal.network.serialization.SerializationService;
import
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -341,7 +342,8 @@ public class ItConnectionManagerTest {
new SerializationService(registry,
mock(UserObjectSerializationContext.class)),
launchId,
consistentId,
- bootstrapFactory
+ bootstrapFactory,
+ new AllIdsAreFresh()
);
manager.start();
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index ec177531b7..d9e86b4417 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.scalecube;
+import static java.util.stream.Collectors.toUnmodifiableList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -35,22 +36,30 @@ import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import java.util.logging.Handler;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.testframework.jul.NoOpHandler;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NodeFinder;
@@ -320,6 +329,93 @@ class ItScaleCubeNetworkMessagingTest {
assertThat(networkMessageFuture, willBe(equalTo(networkMessage)));
}
+ /**
+ * Makes sure that a node that dropped out from the Physical Topology
cannot reappear with same ID.
+ *
+ * @throws Exception in case of errors.
+ */
+ @SuppressWarnings("ConstantConditions")
+ @Test
+ public void nodeCannotReuseOldId(TestInfo testInfo) throws Exception {
+ testCluster = new Cluster(3, testInfo);
+
+ testCluster.startAwait();
+
+ String outcastName =
testCluster.members.get(testCluster.members.size() - 1).nodeName();
+
+ knockOutNode(outcastName);
+
+ IgniteBiTuple<CountDownLatch, AtomicBoolean> pair =
reanimateNode(outcastName);
+ CountDownLatch ready = pair.get1();
+ AtomicBoolean reappeared = pair.get2();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS), "Node neither
reappeared, not was rejected");
+
+ assertThat(reappeared.get(), is(false));
+ }
+
+ private void knockOutNode(String outcastName) throws InterruptedException {
+ CountDownLatch disappeared = new CountDownLatch(1);
+
+ testCluster.members.get(0).topologyService().addEventHandler(new
TopologyEventHandler() {
+ @Override
+ public void onDisappeared(ClusterNode member) {
+ if (Objects.equals(member.name(), outcastName)) {
+ disappeared.countDown();
+ }
+ }
+ });
+
+ testCluster.members.stream()
+ .filter(service -> !outcastName.equals(service.nodeName()))
+ .forEach(service -> {
+ DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
+ messagingService.dropMessages((recipientConsistentId,
message) -> outcastName.equals(recipientConsistentId));
+ });
+
+ assertTrue(disappeared.await(10, TimeUnit.SECONDS), "Node did not
disappear in time");
+ }
+
+ private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String
outcastName) {
+ CountDownLatch ready = new CountDownLatch(1);
+ AtomicBoolean reappeared = new AtomicBoolean(false);
+
+ testCluster.members.get(0).topologyService().addEventHandler(new
TopologyEventHandler() {
+ @Override
+ public void onAppeared(ClusterNode member) {
+ if (Objects.equals(member.name(), outcastName)) {
+ reappeared.compareAndSet(false, true);
+
+ ready.countDown();
+ }
+ }
+ });
+
+ Handler rejectedHandshakeHandler = new NoOpHandler() {
+ @Override
+ public void publish(LogRecord record) {
+ if (record.getMessage().startsWith("Handshake rejected by ")) {
+ ready.countDown();
+ }
+ }
+ };
+
+ Logger clientHandshakeManagerLogger =
Logger.getLogger(RecoveryClientHandshakeManager.class.getName());
+ clientHandshakeManagerLogger.addHandler(rejectedHandshakeHandler);
+
+ Logger serverHandshakeManagerLogger =
Logger.getLogger(RecoveryServerHandshakeManager.class.getName());
+ serverHandshakeManagerLogger.addHandler(rejectedHandshakeHandler);
+
+ testCluster.members.stream()
+ .filter(service -> !outcastName.equals(service.nodeName()))
+ .forEach(service -> {
+ DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
+ messagingService.stopDroppingMessages();
+ });
+
+ return new IgniteBiTuple<>(ready, reappeared);
+ }
+
/**
* Tests shutdown.
*
@@ -420,7 +516,7 @@ class ItScaleCubeNetworkMessagingTest {
members = addresses.stream()
.map(addr -> startNode(testInfo, addr,
isInitial.getAndSet(false)))
- .collect(Collectors.toUnmodifiableList());
+ .collect(toUnmodifiableList());
}
/**
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index 5aba9a2377..2ffd1220c1 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.internal.network.message.InvokeResponse;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.annotations.MessageGroup;
@@ -64,23 +65,28 @@ public class NetworkMessageTypes {
*/
public static final short HANDSHAKE_FINISH = 5;
+ /**
+ * Type for {@link HandshakeRejectedMessage}.
+ */
+ public static final short HANDSHAKE_REJECTED = 6;
+
/**
* Type for {@link AcknowledgementMessage}.
*/
- public static final short ACKNOWLEDGEMENT = 6;
+ public static final short ACKNOWLEDGEMENT = 7;
/**
* Type for {@link ClassDescriptorMessage}.
*/
- public static final short CLASS_DESCRIPTOR_MESSAGE = 7;
+ public static final short CLASS_DESCRIPTOR_MESSAGE = 8;
/**
* Type for {@link FieldDescriptorMessage}.
*/
- public static final short FIELD_DESCRIPTOR_MESSAGE = 8;
+ public static final short FIELD_DESCRIPTOR_MESSAGE = 9;
/**
* Type for {@link ClassDescriptorListMessage}.
*/
- public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 9;
+ public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 10;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 66a5ea74b6..371c295e48 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManage
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ChannelType;
@@ -87,6 +88,9 @@ public class ConnectionManager {
/** Node launch id. As opposed to {@link #consistentId}, this identifier
changes between restarts. */
private final UUID launchId;
+ /** Used to detect that a peer uses a stale ID. */
+ private final StaleIdDetector staleIdDetector;
+
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
private final RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory;
@@ -110,13 +114,15 @@ public class ConnectionManager {
* @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
+ * @param staleIdDetector Detects stale member IDs.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
UUID launchId,
String consistentId,
- NettyBootstrapFactory bootstrapFactory
+ NettyBootstrapFactory bootstrapFactory,
+ StaleIdDetector staleIdDetector
) {
this(
networkConfiguration,
@@ -124,7 +130,8 @@ public class ConnectionManager {
launchId,
consistentId,
bootstrapFactory,
- new DefaultRecoveryClientHandshakeManagerFactory()
+ staleIdDetector,
+ new
DefaultRecoveryClientHandshakeManagerFactory(staleIdDetector)
);
}
@@ -136,6 +143,7 @@ public class ConnectionManager {
* @param launchId Launch id of this node.
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
+ * @param staleIdDetector Detects stale member IDs.
* @param clientHandshakeManagerFactory Factory for {@link
RecoveryClientHandshakeManager} instances.
*/
public ConnectionManager(
@@ -144,11 +152,13 @@ public class ConnectionManager {
UUID launchId,
String consistentId,
NettyBootstrapFactory bootstrapFactory,
+ StaleIdDetector staleIdDetector,
RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactory
) {
this.serializationService = serializationService;
this.launchId = launchId;
this.consistentId = consistentId;
+ this.staleIdDetector = staleIdDetector;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
@@ -343,7 +353,7 @@ public class ConnectionManager {
}
private HandshakeManager createServerHandshakeManager() {
- return new RecoveryServerHandshakeManager(launchId, consistentId,
FACTORY, descriptorProvider);
+ return new RecoveryServerHandshakeManager(launchId, consistentId,
FACTORY, descriptorProvider, staleIdDetector);
}
/**
@@ -384,13 +394,19 @@ public class ConnectionManager {
* Factory producing vanilla {@link RecoveryClientHandshakeManager}
instances.
*/
private static class DefaultRecoveryClientHandshakeManagerFactory
implements RecoveryClientHandshakeManagerFactory {
+ private final StaleIdDetector staleIdDetector;
+
+ private DefaultRecoveryClientHandshakeManagerFactory(StaleIdDetector
staleIdDetector) {
+ this.staleIdDetector = staleIdDetector;
+ }
+
@Override
public RecoveryClientHandshakeManager create(UUID launchId,
String consistentId,
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider
) {
- return new RecoveryClientHandshakeManager(launchId, consistentId,
connectionId, recoveryDescriptorProvider);
+ return new RecoveryClientHandshakeManager(launchId, consistentId,
connectionId, recoveryDescriptorProvider, staleIdDetector);
}
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
index d04caf0a72..e8ffa5b042 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -95,7 +95,7 @@ public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProv
public int hashCode() {
int result = consistentId.hashCode();
result = 31 * result + launchId.hashCode();
- result = 31 * result + (int) connectionId;
+ result = 31 * result + connectionId;
result = 31 * result + (inbound ? 1 : 0);
return result;
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index b887fc21a5..83b60e4834 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -67,7 +67,13 @@ public class HandshakeHandler extends
ChannelInboundHandlerAdapter {
/** {@inheritDoc} */
@Override
public void channelActive(ChannelHandlerContext ctx) {
- manager.onConnectionOpen();
+ try {
+ manager.onConnectionOpen();
+ } catch (Throwable e) {
+ LOG.error("Error in onConnectionOpen()", e);
+
+ throw e;
+ }
manager.handshakeFuture().whenComplete((unused, throwable) -> {
if (throwable != null) {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
new file mode 100644
index 0000000000..fe83e445b8
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/FailureHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * A placeholder 'failure handler' to make it easier to recall where a real
failure handler should be used.
+ */
+// TODO: IGNITE-16899 Replace with a real FailureHandler
+class FailureHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(FailureHandler.class);
+
+ void handleFailure(Throwable t) {
+ LOG.error("Critical failure", t);
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
new file mode 100644
index 0000000000..7cb3aedf44
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/InMemoryStaleIds.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of {@link StaleIds} that holds its state in memory.
+ */
+public class InMemoryStaleIds implements StaleIds {
+ private final Set<String> ids = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+ @Override
+ public boolean isIdStale(String nodeId) {
+ return ids.contains(nodeId);
+ }
+
+ @Override
+ public void markAsStale(String nodeId) {
+ ids.add(nodeId);
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 8925b19834..cefc0a16a1 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.network.recovery;
+import static java.util.Collections.emptyList;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
@@ -24,6 +26,8 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -33,8 +37,10 @@ import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.jetbrains.annotations.TestOnly;
@@ -43,6 +49,8 @@ import org.jetbrains.annotations.TestOnly;
* Recovery protocol handshake manager for a client.
*/
public class RecoveryClientHandshakeManager implements HandshakeManager {
+ private static final IgniteLogger LOG =
Loggers.forClass(RecoveryClientHandshakeManager.class);
+
/** Message factory. */
private static final NetworkMessagesFactory MESSAGE_FACTORY = new
NetworkMessagesFactory();
@@ -55,6 +63,9 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+ /** Used to detect that a peer uses a stale ID. */
+ private final StaleIdDetector staleIdDetector;
+
/** Connection id. */
private final short connectionId;
@@ -79,6 +90,8 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
/** Recovery descriptor. */
private RecoveryDescriptor recoveryDescriptor;
+ private final FailureHandler failureHandler = new FailureHandler();
+
/**
* Constructor.
*
@@ -90,11 +103,14 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
UUID launchId,
String consistentId,
short connectionId,
- RecoveryDescriptorProvider recoveryDescriptorProvider) {
+ RecoveryDescriptorProvider recoveryDescriptorProvider,
+ StaleIdDetector staleIdDetector
+ ) {
this.launchId = launchId;
this.consistentId = consistentId;
this.connectionId = connectionId;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+ this.staleIdDetector = staleIdDetector;
}
/** {@inheritDoc} */
@@ -111,6 +127,12 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
if (message instanceof HandshakeStartMessage) {
HandshakeStartMessage msg = (HandshakeStartMessage) message;
+ if (staleIdDetector.isIdStale(msg.launchId().toString())) {
+ handleStaleServerId(msg);
+
+ return;
+ }
+
this.remoteLaunchId = msg.launchId();
this.remoteConsistentId = msg.consistentId();
@@ -126,6 +148,19 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
return;
}
+ if (message instanceof HandshakeRejectedMessage) {
+ HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
+
+ LOG.warn("Handshake rejected by server: {}", msg.reason());
+
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.reason()));
+
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by server: " + msg.reason()));
+
+ return;
+ }
+
assert recoveryDescriptor != null : "Wrong client handshake flow";
if (message instanceof HandshakeFinishMessage) {
@@ -162,6 +197,25 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
ctx.fireChannelRead(message);
}
+ private void handleStaleServerId(HandshakeStartMessage msg) {
+ String reason = msg.launchId() + " is stale, server should be
restarted so that clients can connect";
+ HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
+ .reason(reason)
+ .build();
+
+ ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(rejectionMessage, emptyList(), false));
+
+ NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
+ if (throwable != null) {
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake
rejected message: " + throwable.getMessage(), throwable)
+ );
+ } else {
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(reason));
+ }
+ });
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<NettySender> handshakeFuture() {
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 54501be1d8..395de002b7 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.network.recovery;
+import static java.util.Collections.emptyList;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
-import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeException;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
@@ -33,8 +36,10 @@ import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.netty.PipelineUtils;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
+import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.jetbrains.annotations.TestOnly;
@@ -43,6 +48,8 @@ import org.jetbrains.annotations.TestOnly;
* Recovery protocol handshake manager for a server.
*/
public class RecoveryServerHandshakeManager implements HandshakeManager {
+ private static final IgniteLogger LOG =
Loggers.forClass(RecoveryServerHandshakeManager.class);
+
/** Launch id. */
private final UUID launchId;
@@ -78,9 +85,14 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
/** Recovery descriptor provider. */
private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+ /** Used to detect that a peer uses a stale ID. */
+ private final StaleIdDetector staleIdDetector;
+
/** Recovery descriptor. */
private RecoveryDescriptor recoveryDescriptor;
+ private final FailureHandler failureHandler = new FailureHandler();
+
/**
* Constructor.
*
@@ -93,12 +105,14 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
UUID launchId,
String consistentId,
NetworkMessagesFactory messageFactory,
- RecoveryDescriptorProvider recoveryDescriptorProvider
+ RecoveryDescriptorProvider recoveryDescriptorProvider,
+ StaleIdDetector staleIdDetector
) {
this.launchId = launchId;
this.consistentId = consistentId;
this.messageFactory = messageFactory;
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
+ this.staleIdDetector = staleIdDetector;
}
/** {@inheritDoc} */
@@ -117,7 +131,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
.consistentId(consistentId)
.build();
- ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(handshakeStartMessage, Collections.emptyList(), false));
+ ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(handshakeStartMessage, emptyList(), false));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
if (throwable != null) {
@@ -134,6 +148,12 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
if (message instanceof HandshakeStartResponseMessage) {
HandshakeStartResponseMessage msg =
(HandshakeStartResponseMessage) message;
+ if (staleIdDetector.isIdStale(msg.launchId().toString())) {
+ handleStaleClientId(msg);
+
+ return;
+ }
+
this.remoteLaunchId = msg.launchId();
this.remoteConsistentId = msg.consistentId();
this.receivedCount = msg.receivedCount();
@@ -150,6 +170,19 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
return;
}
+ if (message instanceof HandshakeRejectedMessage) {
+ HandshakeRejectedMessage msg = (HandshakeRejectedMessage) message;
+
+ LOG.warn("Handshake rejected by client: {}", msg.reason());
+
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.reason()));
+
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by client: " + msg.reason()));
+
+ return;
+ }
+
assert recoveryDescriptor != null : "Wrong server handshake flow";
if (recoveryDescriptor.unacknowledgedCount() == 0) {
@@ -159,6 +192,25 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
ctx.fireChannelRead(message);
}
+ private void handleStaleClientId(HandshakeStartResponseMessage msg) {
+ String reason = msg.launchId() + " is stale, client should be
restarted to be allowed to connect";
+ HandshakeRejectedMessage rejectionMessage =
messageFactory.handshakeRejectedMessage()
+ .reason(reason)
+ .build();
+
+ ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(rejectionMessage, emptyList(), false));
+
+ NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused,
throwable) -> {
+ if (throwable != null) {
+ handshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Failed to send handshake
rejected message: " + throwable.getMessage(), throwable)
+ );
+ } else {
+ handshakeCompleteFuture.completeExceptionally(new
HandshakeException(reason));
+ }
+ });
+ }
+
private void handshake(RecoveryDescriptor descriptor) {
PipelineUtils.afterHandshake(ctx.pipeline(), descriptor,
createMessageHandler(), messageFactory);
@@ -167,7 +219,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
.build();
CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
- channel.write(new OutNetworkObject(response,
Collections.emptyList(), false))
+ channel.write(new OutNetworkObject(response, emptyList(),
false))
);
descriptor.acknowledge(receivedCount);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
new file mode 100644
index 0000000000..ee04b895ea
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIdDetector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * Allows to detect whether an ID identifying a node on the networking level
is stale or not.
+ * An ID becomes stale when a node having this ID disappears from the Physical
Topology.
+ */
+@FunctionalInterface
+public interface StaleIdDetector {
+ /**
+ * Returns {@code true} iff the given ID is stale.
+ *
+ * @param nodeId ID to check.
+ * @return {@code true} iff the given ID is stale.
+ */
+ boolean isIdStale(String nodeId);
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
new file mode 100644
index 0000000000..8fdd4c915c
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/StaleIds.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * Represents a set of IDs that became stale.
+ *
+ * @see StaleIdDetector
+ */
+public interface StaleIds extends StaleIdDetector {
+ /**
+ * Marks a node ID as stale.
+ *
+ * @param nodeId ID to mark as stale.
+ */
+ void markAsStale(String nodeId);
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
new file mode 100644
index 0000000000..3f75b7b3ac
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/VaultStateIds.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * {@link StaleIds} implementating using Vault as a persistent storage.
+ */
+public class VaultStateIds implements StaleIds {
+ private static final ByteArray STALE_IDS_KEY = new
ByteArray("network.staleIds");
+
+ private static final int DEFAULT_MAX_IDS_TO_REMEMBER = 10_000;
+
+ private final VaultManager vaultManager;
+
+ private final int maxIdsToRemember;
+
+ private Set<String> staleIds;
+
+ public VaultStateIds(VaultManager vaultManager) {
+ this(vaultManager, DEFAULT_MAX_IDS_TO_REMEMBER);
+ }
+
+ public VaultStateIds(VaultManager vaultManager, int maxIdsToRemember) {
+ this.vaultManager = vaultManager;
+ this.maxIdsToRemember = maxIdsToRemember;
+ }
+
+ @Override
+ public synchronized boolean isIdStale(String nodeId) {
+ loadFromVaultIfFirstOperation();
+
+ return staleIds.contains(nodeId);
+ }
+
+ private void loadFromVaultIfFirstOperation() {
+ if (staleIds == null) {
+ staleIds = loadStaleIdsFromVault();
+ }
+ }
+
+ private Set<String> loadStaleIdsFromVault() {
+ VaultEntry entry = vaultManager.get(STALE_IDS_KEY).join();
+
+ if (entry == null) {
+ return new LinkedHashSet<>();
+ }
+
+ String[] idsArray = new String(entry.value(), UTF_8).split("\n");
+
+ Set<String> result = new LinkedHashSet<>();
+
+ Collections.addAll(result, idsArray);
+
+ return result;
+ }
+
+ @Override
+ public synchronized void markAsStale(String nodeId) {
+ loadFromVaultIfFirstOperation();
+
+ staleIds.add(nodeId);
+
+ int idsToRemove = staleIds.size() - maxIdsToRemember;
+
+ Iterator<String> iterator = staleIds.iterator();
+ for (int i = 0; i < idsToRemove; i++) {
+ iterator.next();
+ iterator.remove();
+ }
+
+ saveIdsToVault();
+ }
+
+ private void saveIdsToVault() {
+ String joinedIds = String.join("\n", staleIds);
+
+ vaultManager.put(STALE_IDS_KEY, joinedIds.getBytes(UTF_8)).join();
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
new file mode 100644
index 0000000000..88c5efe3f9
--- /dev/null
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectedMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery.message;
+
+import static
org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_REJECTED;
+
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Handshake rejected message, contains the reason for a rejection.
+ * This message is sent from a server to a client or wise versa.
+ * After this message is received it makes no sense to retry connections with
same node identity (launch ID must be changed
+ * to make a retry).
+ */
+@Transferable(HANDSHAKE_REJECTED)
+public interface HandshakeRejectedMessage extends InternalMessage {
+ /**
+ * Returns rejection reason.
+ *
+ * @return Reason of the rejection.
+ */
+ String reason();
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index c23b656afd..7f0d8a2234 100644
---
a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NetworkView;
import org.apache.ignite.internal.network.configuration.ScaleCubeView;
import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.StaleIds;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -47,6 +48,7 @@ import
org.apache.ignite.internal.network.serialization.UserObjectSerializationC
import
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.AbstractClusterService;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.DefaultMessagingService;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -54,6 +56,7 @@ import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.NodeFinderFactory;
import org.apache.ignite.network.NodeMetadata;
+import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
@@ -77,7 +80,8 @@ public class ScaleCubeClusterServiceFactory {
String consistentId,
NetworkConfiguration networkConfiguration,
NettyBootstrapFactory nettyBootstrapFactory,
- MessageSerializationRegistry serializationRegistry
+ MessageSerializationRegistry serializationRegistry,
+ StaleIds staleIds
) {
var messageFactory = new NetworkMessagesFactory();
@@ -114,7 +118,8 @@ public class ScaleCubeClusterServiceFactory {
serializationService,
launchId,
consistentId,
- nettyBootstrapFactory
+ nettyBootstrapFactory,
+ staleIds
);
connectionMgr.start();
@@ -135,7 +140,11 @@ public class ScaleCubeClusterServiceFactory {
topologyService.onMembershipEvent(event);
}
})
- .config(opts ->
opts.memberAlias(consistentId).metadataCodec(METADATA_CODEC))
+ .config(opts -> opts
+ .memberId(launchId.toString())
+ .memberAlias(consistentId)
+ .metadataCodec(METADATA_CODEC)
+ )
.transport(opts ->
opts.transportFactory(transportConfig -> transport))
.membership(opts ->
opts.seedMembers(parseAddresses(finder.findNodes())));
@@ -145,6 +154,13 @@ public class ScaleCubeClusterServiceFactory {
topologyService.setCluster(cluster);
messagingService.setConnectionManager(connectionMgr);
+ topologyService.addEventHandler(new TopologyEventHandler() {
+ @Override
+ public void onDisappeared(ClusterNode member) {
+ staleIds.markAsStale(member.id());
+ }
+ });
+
cluster.startAwait();
// emit an artificial event as if the local member has joined
the topology (ScaleCube doesn't do that)
@@ -199,6 +215,7 @@ public class ScaleCubeClusterServiceFactory {
cluster.updateMetadata(metadata).subscribe();
topologyService.updateLocalMetadata(metadata);
}
+
};
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index 3bbf9881d8..31eabb3539 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -38,10 +38,13 @@ import
org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.messages.TestMessage;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
+import org.apache.ignite.internal.network.recovery.AllIdsAreStale;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
import
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
@@ -254,7 +257,7 @@ public class RecoveryHandshakeTest {
}
/**
- * Tests that message was received exactly once in case if network failure
during acknowledgement.
+ * Tests that message was received exactly once in case of network failure
during acknowledgement.
*
* @param serverDidntReceiveAck {@code true} if server didn't receive the
acknowledgement, {@code false} if client didn't receive
* the acknowledgement.
@@ -351,6 +354,77 @@ public class RecoveryHandshakeTest {
assertFalse(clientSideChannel.finish());
}
+ @Test
+ public void serverFailsHandshakeIfClientIdIsAlreadySeen() throws Exception
{
+ RecoveryDescriptorProvider clientRecovery =
createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery =
createRecoveryDescriptorProvider();
+
+ RecoveryClientHandshakeManager clientHandshakeManager =
createRecoveryClientHandshakeManager(clientRecovery);
+ RecoveryServerHandshakeManager serverHandshakeManager =
createRecoveryServerHandshakeManager(
+ "server",
+ UUID.randomUUID(),
+ serverRecovery,
+ new AllIdsAreStale()
+ );
+
+ EmbeddedChannel clientSideChannel =
setupChannel(clientHandshakeManager, noMessageListener);
+
+ EmbeddedChannel serverSideChannel =
setupChannel(serverHandshakeManager, noMessageListener);
+
+ assertTrue(serverSideChannel.isActive());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+ assertNull(clientSideChannel.readOutbound());
+ assertNull(serverSideChannel.readOutbound());
+
+ checkHandshakeCompletedExceptionally(serverHandshakeManager);
+ checkHandshakeCompletedExceptionally(clientHandshakeManager);
+
+ checkPipelineAfterHandshake(serverSideChannel);
+ checkPipelineAfterHandshake(clientSideChannel);
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
+ }
+
+ @Test
+ public void clientFailsHandshakeIfServerIdIsAlreadySeen() throws Exception
{
+ RecoveryDescriptorProvider clientRecovery =
createRecoveryDescriptorProvider();
+ RecoveryDescriptorProvider serverRecovery =
createRecoveryDescriptorProvider();
+
+ RecoveryClientHandshakeManager clientHandshakeManager =
createRecoveryClientHandshakeManager(
+ "client",
+ UUID.randomUUID(),
+ clientRecovery,
+ new AllIdsAreStale()
+ );
+ RecoveryServerHandshakeManager serverHandshakeManager =
createRecoveryServerHandshakeManager(serverRecovery);
+
+ EmbeddedChannel clientSideChannel =
setupChannel(clientHandshakeManager, noMessageListener);
+
+ EmbeddedChannel serverSideChannel =
setupChannel(serverHandshakeManager, noMessageListener);
+
+ assertTrue(serverSideChannel.isActive());
+
+ exchangeServerToClient(serverSideChannel, clientSideChannel);
+ exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+ assertNull(clientSideChannel.readOutbound());
+ assertNull(serverSideChannel.readOutbound());
+
+ checkHandshakeCompletedExceptionally(serverHandshakeManager);
+ checkHandshakeCompletedExceptionally(clientHandshakeManager);
+
+ checkPipelineAfterHandshake(serverSideChannel);
+ checkPipelineAfterHandshake(clientSideChannel);
+
+ assertFalse(serverSideChannel.finish());
+ assertFalse(clientSideChannel.finish());
+ }
+
/** Message listener that accepts a specific message only once. */
private static class MessageListener implements Consumer<InNetworkObject> {
/** Expected message. */
@@ -396,6 +470,14 @@ public class RecoveryHandshakeTest {
assertFalse(handshakeFuture.isCancelled());
}
+ private void checkHandshakeCompletedExceptionally(HandshakeManager
manager) {
+ CompletableFuture<NettySender> handshakeFuture =
manager.handshakeFuture();
+
+ assertTrue(handshakeFuture.isDone());
+ assertTrue(handshakeFuture.isCompletedExceptionally());
+ assertFalse(handshakeFuture.isCancelled());
+ }
+
private void addUnacknowledgedMessages(RecoveryDescriptor
recoveryDescriptor) {
TestMessage msg =
TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
recoveryDescriptor.add(new OutNetworkObject(msg,
Collections.emptyList()));
@@ -447,7 +529,12 @@ public class RecoveryHandshakeTest {
private RecoveryClientHandshakeManager
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider) {
- return new RecoveryClientHandshakeManager(launchId, consistentId,
CONNECTION_ID, provider);
+ return createRecoveryClientHandshakeManager(consistentId, launchId,
provider, new AllIdsAreFresh());
+ }
+
+ private RecoveryClientHandshakeManager
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+ RecoveryDescriptorProvider provider, StaleIdDetector
staleIdDetector) {
+ return new RecoveryClientHandshakeManager(launchId, consistentId,
CONNECTION_ID, provider, staleIdDetector);
}
private RecoveryServerHandshakeManager
createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
@@ -456,7 +543,12 @@ public class RecoveryHandshakeTest {
private RecoveryServerHandshakeManager
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
RecoveryDescriptorProvider provider) {
- return new RecoveryServerHandshakeManager(launchId, consistentId,
MESSAGE_FACTORY, provider);
+ return createRecoveryServerHandshakeManager(consistentId, launchId,
provider, new AllIdsAreFresh());
+ }
+
+ private RecoveryServerHandshakeManager
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+ RecoveryDescriptorProvider provider, StaleIdDetector
staleIdDetector) {
+ return new RecoveryServerHandshakeManager(launchId, consistentId,
MESSAGE_FACTORY, provider, staleIdDetector);
}
private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
new file mode 100644
index 0000000000..cd2437a403
--- /dev/null
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/VaultStateIdsTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class VaultStateIdsTest {
+ @Mock
+ private VaultManager vaultManager;
+
+ private final ByteArray staleIdsKey = new ByteArray("network.staleIds");
+
+ private VaultStateIds staleIds;
+
+ @BeforeEach
+ void createObjectToTest() {
+ staleIds = new VaultStateIds(vaultManager);
+ }
+
+ @Test
+ void consultsVaultWhenCheckingForStaleness() {
+ doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1\nid2\nid3".getBytes(UTF_8))))
+ .when(vaultManager).get(staleIdsKey);
+
+ assertThat(staleIds.isIdStale("id1"), is(true));
+ assertThat(staleIds.isIdStale("id2"), is(true));
+ assertThat(staleIds.isIdStale("id3"), is(true));
+ assertThat(staleIds.isIdStale("id10"), is(false));
+ }
+
+ @Test
+ void cachesVaultStateInMemory() {
+ doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1\nid2\nid3".getBytes(UTF_8))))
+ .when(vaultManager).get(staleIdsKey);
+
+ staleIds.isIdStale("id1");
+ staleIds.isIdStale("id2");
+ staleIds.isIdStale("id3");
+
+ verify(vaultManager, times(1)).get(any());
+ }
+
+ @Test
+ void savesNewStaleIdsToVault() {
+ doReturn(completedFuture(null)).when(vaultManager).get(staleIdsKey);
+ doReturn(completedFuture(null))
+ .when(vaultManager).put(staleIdsKey, "id2".getBytes(UTF_8));
+ doReturn(completedFuture(null))
+ .when(vaultManager).put(staleIdsKey,
"id2\nid1".getBytes(UTF_8));
+
+ staleIds.markAsStale("id2");
+ staleIds.markAsStale("id1");
+ }
+
+ @Test
+ void respectsMaxIdsLimit() {
+ staleIds = new VaultStateIds(vaultManager, 2);
+
+ doReturn(completedFuture(null)).when(vaultManager).get(staleIdsKey);
+
+ AtomicReference<String> lastSavedIds = new AtomicReference<>();
+
+ doAnswer(invocation -> {
+ byte[] value = invocation.getArgument(1);
+
+ lastSavedIds.set(new String(value, UTF_8));
+
+ return completedFuture(null);
+ }).when(vaultManager).put(eq(staleIdsKey), any());
+
+ staleIds.markAsStale("id3");
+ staleIds.markAsStale("id2");
+ staleIds.markAsStale("id1");
+
+ assertThat(lastSavedIds.get(), is("id2\nid1"));
+ }
+
+ @Test
+ void loadsBeforeDoingFirstSave() {
+ lenient().doReturn(completedFuture(new VaultEntry(staleIdsKey,
"id1".getBytes(UTF_8))))
+ .when(vaultManager).get(staleIdsKey);
+
doReturn(completedFuture(null)).when(vaultManager).put(eq(staleIdsKey), any());
+
+ staleIds.markAsStale("id2");
+
+ verify(vaultManager).put(staleIdsKey, "id1\nid2".getBytes(UTF_8));
+ }
+}
diff --git
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
index 5987dcf41f..c9a89dc01d 100644
---
a/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java
@@ -33,7 +33,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -47,9 +46,11 @@ import
org.apache.ignite.internal.network.messages.TestMessageSerializationFacto
import org.apache.ignite.internal.network.messages.TestMessageTypes;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.recovery.AllIdsAreFresh;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManagerFactory;
import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.StaleIdDetector;
import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
import
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -101,7 +102,7 @@ class DefaultMessagingServiceTest {
);
@BeforeEach
- void setUp() throws InterruptedException, ExecutionException {
+ void setUp() {
lenient().when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
}
@@ -290,13 +291,17 @@ class DefaultMessagingServiceTest {
NettyBootstrapFactory bootstrapFactory = new
NettyBootstrapFactory(networkConfig, eventLoopGroupNamePrefix);
bootstrapFactory.start();
+ StaleIdDetector staleIdDetector = new AllIdsAreFresh();
+
+ UUID launchId = UUID.randomUUID();
ConnectionManager connectionManager = new ConnectionManager(
networkConfig.value(),
serializationService,
- UUID.randomUUID(),
+ launchId,
node.name(),
bootstrapFactory,
- clientHandshakeManagerFactoryAdding(beforeHandshake)
+ staleIdDetector,
+ clientHandshakeManagerFactoryAdding(beforeHandshake,
staleIdDetector)
);
connectionManager.start();
@@ -305,7 +310,8 @@ class DefaultMessagingServiceTest {
return new Services(connectionManager, messagingService);
}
- private static RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) {
+ private static RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactoryAdding(Runnable beforeHandshake,
+ StaleIdDetector staleIdDetector) {
return new RecoveryClientHandshakeManagerFactory() {
@Override
public RecoveryClientHandshakeManager create(
@@ -313,10 +319,17 @@ class DefaultMessagingServiceTest {
String consistentId,
short connectionId,
RecoveryDescriptorProvider recoveryDescriptorProvider) {
- return new RecoveryClientHandshakeManager(launchId,
consistentId, connectionId, recoveryDescriptorProvider) {
+ return new RecoveryClientHandshakeManager(
+ launchId,
+ consistentId,
+ connectionId,
+ recoveryDescriptorProvider,
+ staleIdDetector
+ ) {
@Override
protected void finishHandshake() {
beforeHandshake.run();
+
super.finishHandshake();
}
};
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
new file mode 100644
index 0000000000..fbbe03d201
--- /dev/null
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreFresh.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * {@link StaleIdDetector} that reports all IDs as fresh (i.e. not stale).
+ */
+public class AllIdsAreFresh implements StaleIdDetector {
+ @Override
+ public boolean isIdStale(String nodeId) {
+ return false;
+ }
+}
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
new file mode 100644
index 0000000000..be7d4eb099
--- /dev/null
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/recovery/AllIdsAreStale.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+/**
+ * {@link StaleIdDetector} that reports all IDs as stale.
+ */
+public class AllIdsAreStale implements StaleIdDetector {
+ @Override
+ public boolean isIdStale(String nodeId) {
+ return true;
+ }
+}
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
b/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
index 57cc3716dc..19d94766df 100644
---
a/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
@@ -19,7 +19,6 @@ package org.apache.ignite.network.scalecube;
import io.scalecube.cluster.ClusterConfig;
import org.apache.ignite.internal.network.configuration.ClusterMembershipView;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
/**
* Scalecube test factory. Provides fast detection time.
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
index 97e95ca0bc..f298c7c24f 100644
---
a/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -30,6 +30,8 @@ import
org.apache.ignite.internal.configuration.ConfigurationManager;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.configuration.NodeFinderType;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
+import org.apache.ignite.internal.network.recovery.StaleIds;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterService;
@@ -75,9 +77,23 @@ public class ClusterServiceTestUtils {
* @param nodeFinder Node finder.
*/
public static ClusterService clusterService(TestInfo testInfo, int port,
NodeFinder nodeFinder) {
+ return clusterService(testInfo, port, nodeFinder, new
InMemoryStaleIds());
+ }
+
+ /**
+ * Creates a cluster service and required node configuration manager
beneath it. Populates node configuration with specified port.
+ * Manages configuration manager lifecycle: on cluster service start
starts node configuration manager, on cluster service stop - stops
+ * node configuration manager.
+ *
+ * @param testInfo Test info.
+ * @param port Local port.
+ * @param nodeFinder Node finder.
+ * @param staleIds Used to track stale launch IDs.
+ */
+ public static ClusterService clusterService(TestInfo testInfo, int port,
NodeFinder nodeFinder, StaleIds staleIds) {
String nodeName = testNodeName(testInfo, port);
- return clusterService(nodeName, port, nodeFinder);
+ return clusterService(nodeName, port, nodeFinder, staleIds);
}
/**
@@ -89,6 +105,19 @@ public class ClusterServiceTestUtils {
* @return Cluster service instance.
*/
public static ClusterService clusterService(String nodeName, int port,
NodeFinder nodeFinder) {
+ return clusterService(nodeName, port, nodeFinder, new
InMemoryStaleIds());
+ }
+
+ /**
+ * Creates a cluster service with predefined name.
+ *
+ * @param nodeName Node name.
+ * @param port Local port.
+ * @param nodeFinder Node finder.
+ * @param staleIds Used to track stale launch IDs.
+ * @return Cluster service instance.
+ */
+ private static ClusterService clusterService(String nodeName, int port,
NodeFinder nodeFinder, StaleIds staleIds) {
ConfigurationManager nodeConfigurationMgr = new ConfigurationManager(
Collections.singleton(NetworkConfiguration.KEY),
Set.of(),
@@ -107,7 +136,8 @@ public class ClusterServiceTestUtils {
nodeName,
networkConfiguration,
bootstrapFactory,
- serializationRegistry
+ serializationRegistry,
+ staleIds
);
assert nodeFinder instanceof StaticNodeFinder : "Only StaticNodeFinder
is supported at the moment";
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index 9eaa98ac06..8626d2e22f 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -172,7 +173,8 @@ public class ItTruncateSuffixAndRestartTest {
nodeName,
networkConfiguration,
nettyBootstrapFactory,
- defaultSerializationRegistry()
+ defaultSerializationRegistry(),
+ new InMemoryStaleIds()
);
clusterSvc.start();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index f6cf2b43ab..1cc34ef03e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
@@ -237,7 +238,7 @@ class ItLogicalTopologyTest extends
ClusterPerTestIntegrationTest {
}
@Test
- void nodeReturnedToPhysicalTopologyReturnsToLogicalTopology() throws
Exception {
+ void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws
Exception {
IgniteImpl entryNode = node(0);
IgniteImpl secondIgnite = startNode(1);
@@ -257,7 +258,7 @@ class ItLogicalTopologyTest extends
ClusterPerTestIntegrationTest {
entryNode.stopDroppingMessages();
- assertTrue(secondIgniteAppeared.await(10, TimeUnit.SECONDS), "Did not
see second node coming back in time");
+ assertFalse(secondIgniteAppeared.await(3, TimeUnit.SECONDS), "Second
node returned to logical topology");
}
private static void makeSecondNodeDisappearForFirstNode(IgniteImpl
firstIgnite, IgniteImpl secondIgnite) throws InterruptedException {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 9db9950775..c02d867fa3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -89,6 +89,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.raft.Loza;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -283,7 +284,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
name,
networkConfiguration,
nettyBootstrapFactory,
- defaultSerializationRegistry()
+ defaultSerializationRegistry(),
+ new VaultStateIds(vault)
);
HybridClock hybridClock = new HybridClockImpl();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f785d423aa..b91fd843e5 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -85,6 +85,7 @@ import
org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
+import org.apache.ignite.internal.network.recovery.VaultStateIds;
import org.apache.ignite.internal.raft.Loza;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
@@ -324,7 +325,8 @@ public class IgniteImpl implements Ignite {
name,
networkConfiguration,
nettyBootstrapFactory,
- serializationRegistry
+ serializationRegistry,
+ new VaultStateIds(vaultMgr)
);
computeComponent = new ComputeComponentImpl(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
index 036638970b..71dbcb273a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java
@@ -19,6 +19,7 @@ package org.apache.ignite.distributed;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;
@@ -51,9 +52,11 @@ public class ItTxDistributedTestThreeNodesThreeReplicas
extends ItTxDistributedT
@Override
@AfterEach
public void after() throws Exception {
- assertTrue(IgniteTestUtils.waitForCondition(() ->
assertPartitionsSame(accounts, 0), 5_000));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
assertPartitionsSame(customers, 0), 5_000));
-
- super.after();
+ try {
+ assertTrue(IgniteTestUtils.waitForCondition(() ->
assertPartitionsSame(accounts, 0), TimeUnit.SECONDS.toMillis(5)));
+ assertTrue(IgniteTestUtils.waitForCondition(() ->
assertPartitionsSame(customers, 0), TimeUnit.SECONDS.toMillis(5)));
+ } finally {
+ super.after();
+ }
}
}