This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new febb432e5e IGNITE-22804 Refuse network handshake if nodes have
different cluster IDs (#4181)
febb432e5e is described below
commit febb432e5e0636ad31e946067a9f21e8f5d56d7d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Aug 6 14:27:59 2024 +0400
IGNITE-22804 Refuse network handshake if nodes have different cluster IDs
(#4181)
---
.../cluster/management/ItClusterIdChangeTest.java | 65 ++++++++++++++++
.../cluster/management/ClusterIdHolder.java} | 31 +++++---
.../cluster/management/ClusterIdService.java | 54 ++++++++++++++
.../cluster/management/ClusterIdStore.java} | 20 ++---
.../management/ClusterManagementGroupManager.java | 22 ++++--
.../raft/ClusterStateStorageManager.java | 2 +-
.../cluster/management/ClusterIdHolderTest.java} | 35 ++++++---
.../ClusterManagementGroupManagerTest.java | 3 +-
.../internal/cluster/management/MockNode.java | 5 +-
...niteDistributionZoneManagerNodeRestartTest.java | 11 ++-
.../ItMetaStorageMultipleNodesAbstractTest.java | 4 +-
.../metastorage/impl/ItMetaStorageWatchTest.java | 4 +-
.../internal/network/ClusterIdSupplier.java} | 17 ++---
.../network/netty/ItConnectionManagerTest.java | 2 +
.../network/scalecube/ItNodeRestartsTest.java | 14 +++-
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 87 ++++++++++++++++++++--
.../network/message/ClusterNodeMessage.java | 5 ++
.../internal/network/netty/ConnectionManager.java | 13 ++++
.../recovery/RecoveryClientHandshakeManager.java | 24 ++++++
.../recovery/RecoveryServerHandshakeManager.java | 16 ++--
.../recovery/message/HandshakeRejectionReason.java | 13 +++-
.../recovery/message/HandshakeStartMessage.java | 6 ++
.../scalecube/ScaleCubeClusterServiceFactory.java | 4 +
.../network/DefaultMessagingServiceTest.java | 15 +++-
.../network/netty/RecoveryHandshakeTest.java | 6 ++
.../RecoveryClientHandshakeManagerTest.java | 26 +++++++
.../RecoveryServerHandshakeManagerTest.java | 5 ++
.../network/ConstantClusterIdSupplier.java} | 33 +++++---
.../network/utils/ClusterServiceTestUtils.java | 37 ++++++++-
.../replicator/ItReplicaLifecycleTest.java | 11 ++-
.../raft/ItTruncateSuffixAndRestartTest.java | 5 +-
.../ItDistributedConfigurationPropertiesTest.java | 4 +-
.../ItDistributedConfigurationStorageTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 14 +++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 27 +++++--
.../rebalance/ItRebalanceDistributedTest.java | 11 ++-
36 files changed, 551 insertions(+), 104 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterIdChangeTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterIdChangeTest.java
new file mode 100644
index 0000000000..e94f83388d
--- /dev/null
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterIdChangeTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that, if 2 nodes have different cluster IDs, they cannot communicate
with each other using the network.
+ */
+class ItClusterIdChangeTest extends ClusterPerTestIntegrationTest {
+ @Override
+ protected int initialNodes() {
+ return 2;
+ }
+
+ @Test
+ void nodesWithDifferentClusterIdsCannotCommunicate() throws Exception {
+ IgniteImpl node1 = node(0);
+ IgniteImpl node2 = node(1);
+
+ node2.clusterIdService().clusterId(UUID.randomUUID());
+
+ ConnectionManager connectionManager1 = ((DefaultMessagingService)
node1.clusterService().messagingService()).connectionManager();
+
+ //noinspection rawtypes
+ CompletableFuture[] closeFutures =
connectionManager1.channels().values().stream()
+ .map(NettySender::closeAsync)
+ .toArray(CompletableFuture[]::new);
+ assertThat(allOf(closeFutures), willCompleteSuccessfully());
+
+ assertFalse(
+ waitForCondition(() ->
connectionManager1.channels().values().stream().anyMatch(NettySender::isOpen),
SECONDS.toMillis(1)),
+ "Nodes with different clusterIDs are able to communicate"
+ );
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
similarity index 57%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
index b88c2c3099..cf5fefe443 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdHolder.java
@@ -15,18 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.cluster.management;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import java.util.UUID;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
/**
- * Handshake start message, contains info about the node.
- * This message is sent from a server to a client at the connection opening.
+ * Holds cluster ID (allowing it to be changed).
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
- /** Returns the server node that sends this. */
- ClusterNodeMessage serverNode();
+public class ClusterIdHolder implements ClusterIdSupplier, ClusterIdStore {
+ private volatile UUID clusterId;
+
+ @Override
+ public UUID clusterId() {
+ return clusterId;
+ }
+
+ /**
+ * Updates the stored cluster ID.
+ *
+ * @param newClusterId New cluster ID.
+ */
+ @Override
+ public void clusterId(UUID newClusterId) {
+ clusterId = newClusterId;
+ }
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
new file mode 100644
index 0000000000..9a001d5d8c
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+
+/**
+ * Used to handle volatile information about cluster ID used to restrict which
nodes can connect this one and vice versa.
+ */
+public class ClusterIdService extends ClusterIdHolder implements
IgniteComponent {
+ private final ClusterStateStorage clusterStateStorage;
+
+ public ClusterIdService(ClusterStateStorage clusterStateStorage) {
+ this.clusterStateStorage = clusterStateStorage;
+ }
+
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ var clusterStateManager = new
ClusterStateStorageManager(clusterStateStorage);
+
+ ClusterState clusterState = clusterStateManager.getClusterState();
+ if (clusterState != null) {
+ clusterId(clusterState.clusterTag().clusterId());
+ }
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ return nullCompletedFuture();
+ }
+}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdStore.java
similarity index 57%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdStore.java
index b88c2c3099..36cf23192b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterIdStore.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.cluster.management;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import java.util.UUID;
/**
- * Handshake start message, contains info about the node.
- * This message is sent from a server to a client at the connection opening.
+ * Knows how to store a cluster ID.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
- /** Returns the server node that sends this. */
- ClusterNodeMessage serverNode();
+public interface ClusterIdStore {
+ /**
+ * Updates the stored cluster ID.
+ *
+ * @param newClusterId New cluster ID.
+ */
+ void clusterId(UUID newClusterId);
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 6e01489cf4..61f56c9e7b 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -147,12 +147,14 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
/** Local node's attributes. */
private final NodeAttributes nodeAttributes;
- /** Future that resolves into the initial cluster configuration in HOCON
format. */
- private final CompletableFuture<String> initialClusterConfigurationFuture
= new CompletableFuture<>();
-
/** Failure processor that is used to handle critical errors. */
private final FailureProcessor failureProcessor;
+ private final ClusterIdStore clusterIdStore;
+
+ /** Future that resolves into the initial cluster configuration in HOCON
format. */
+ private final CompletableFuture<String> initialClusterConfigurationFuture
= new CompletableFuture<>();
+
private final CmgMessageHandler cmgMessageHandler;
/** Constructor. */
@@ -166,7 +168,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
ValidationManager validationManager,
ClusterManagementConfiguration configuration,
NodeAttributes nodeAttributes,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ ClusterIdHolder clusterIdChanger
) {
this.clusterService = clusterService;
this.clusterInitializer = clusterInitializer;
@@ -178,6 +181,7 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
this.localStateStorage = new LocalStateStorage(vault);
this.nodeAttributes = nodeAttributes;
this.failureProcessor = failureProcessor;
+ this.clusterIdStore = clusterIdChanger;
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
NamedThreadFactory.create(clusterService.nodeName(),
"cmg-manager", LOG)
@@ -224,7 +228,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
LogicalTopology logicalTopology,
ClusterManagementConfiguration configuration,
NodeAttributes nodeAttributes,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ ClusterIdHolder clusterIdChanger
) {
this(
vault,
@@ -236,7 +241,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
new ValidationManager(new
ClusterStateStorageManager(clusterStateStorage), logicalTopology),
configuration,
nodeAttributes,
- failureProcessor
+ failureProcessor,
+ clusterIdChanger
);
}
@@ -452,6 +458,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
+ clusterIdStore.clusterId(state.clusterTag().clusterId());
+
return joinCluster(service, state.clusterTag());
});
}
@@ -733,6 +741,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
+ clusterIdStore.clusterId(state.clusterTag().clusterId());
+
return joinCluster(service, state.clusterTag());
});
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
index 2e32262e1d..bbe8697912 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ClusterStateStorageManager.java
@@ -51,7 +51,7 @@ public class ClusterStateStorageManager {
* @return Current state or {@code null} if it has not been initialized.
*/
@Nullable
- ClusterState getClusterState() {
+ public ClusterState getClusterState() {
byte[] value = storage.get(CMG_STATE_KEY);
return value == null ? null : fromBytes(value);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterIdHolderTest.java
similarity index 55%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterIdHolderTest.java
index b88c2c3099..4a2bf0694b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterIdHolderTest.java
@@ -15,18 +15,29 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.cluster.management;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
-/**
- * Handshake start message, contains info about the node.
- * This message is sent from a server to a client at the connection opening.
- */
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
- /** Returns the server node that sends this. */
- ClusterNodeMessage serverNode();
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+
+class ClusterIdHolderTest {
+ private final ClusterIdHolder holder = new ClusterIdHolder();
+
+ @Test
+ void clusterIdIsOriginallyNull() {
+ assertThat(holder.clusterId(), is(nullValue()));
+ }
+
+ @Test
+ void clusterIdGetsUpdated() {
+ UUID newId = UUID.randomUUID();
+
+ holder.clusterId(newId);
+
+ assertThat(holder.clusterId(), is(newId));
+ }
}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
index 8ccb3b1c73..075d76537c 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManagerTest.java
@@ -112,7 +112,8 @@ class ClusterManagementGroupManagerTest extends
BaseIgniteAbstractTest {
logicalTopology,
clusterManagementConfiguration,
nodeAttributes,
- failureProcessor
+ failureProcessor,
+ new ClusterIdHolder()
);
assertThat(clusterService.startAsync(componentContext),
willCompleteSuccessfully());
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 75571ad021..942254b2f6 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -97,6 +97,8 @@ public class MockNode {
var vaultManager = new VaultManager(new
PersistentVaultService(vaultDir));
+ var clusterIdHolder = new ClusterIdHolder();
+
this.clusterService = ClusterServiceTestUtils.clusterService(nodeName,
addr.port(), nodeFinder);
var raftManager = TestLozaFactory.create(clusterService,
raftConfiguration, this.workDir, new HybridClockImpl());
@@ -114,7 +116,8 @@ public class MockNode {
new LogicalTopologyImpl(clusterStateStorage),
cmgConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageProfilesConfiguration),
- failureProcessor
+ failureProcessor,
+ clusterIdHolder
);
components = List.of(
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 6475baac17..2e2694a4fe 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -84,6 +84,7 @@ import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.cluster.management.ClusterIdService;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
@@ -183,6 +184,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
VaultManager vault = createVault(dir);
+ var clusterStateStorage = new TestClusterStateStorage();
+
+ var clusterIdService = new ClusterIdService(clusterStateStorage);
+
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
Path configFile =
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
@@ -216,12 +221,11 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
+ clusterIdService,
new NoOpCriticalWorkerRegistry(),
mock(FailureProcessor.class)
);
- var clusterStateStorage = new TestClusterStateStorage();
-
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var cmgManager = mock(ClusterManagementGroupManager.class);
@@ -294,9 +298,10 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
// Start the remaining components.
List<IgniteComponent> otherComponents = List.of(
+ clusterStateStorage,
+ clusterIdService,
nettyBootstrapFactory,
clusterSvc,
- clusterStateStorage,
cmgManager,
metastore,
clusterCfgMgr,
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index dd5d620b4b..01c0439a7a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -182,7 +183,8 @@ public abstract class
ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstr
logicalTopology,
cmgConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ new ClusterIdHolder()
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 05b7c8f1b5..d70b917c08 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -170,7 +171,8 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
logicalTopology,
cmgConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ new ClusterIdHolder()
);
components.add(cmgManager);
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterIdSupplier.java
similarity index 57%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to
modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterIdSupplier.java
index b88c2c3099..e66dd3d72c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/ClusterIdSupplier.java
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.network;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
/**
- * Handshake start message, contains info about the node.
- * This message is sent from a server to a client at the connection opening.
+ * Supplies with ID of the cluster.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
- /** Returns the server node that sends this. */
- ClusterNodeMessage serverNode();
+public interface ClusterIdSupplier {
+ /** Returns ID of the cluster ({@code null} if the node has never joined a
cluster). */
+ @Nullable UUID clusterId();
}
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 98d4bc0241..b9421de14e 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.netty;
import static java.util.Collections.emptyList;
+import static
org.apache.ignite.internal.network.ConstantClusterIdSupplier.withoutClusterId;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -531,6 +532,7 @@ public class ItConnectionManagerTest extends
BaseIgniteAbstractTest {
consistentId,
bootstrapFactory,
new AllIdsAreFresh(),
+ withoutClusterId(),
mock(FailureProcessor.class)
);
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
index cfaa4e82d5..2f461a357e 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItNodeRestartsTest.java
@@ -25,13 +25,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
@@ -45,6 +49,8 @@ class ItNodeRestartsTest {
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(ItNodeRestartsTest.class);
+ private final ClusterIdSupplier clusterIdSupplier = new
ConstantClusterIdSupplier(UUID.randomUUID());
+
/** Created {@link ClusterService}s. Needed for resource management. */
private List<ClusterService> services;
@@ -108,7 +114,13 @@ class ItNodeRestartsTest {
* @return Created Cluster Service.
*/
private ClusterService startNetwork(TestInfo testInfo, NetworkAddress
addr, NodeFinder nodeFinder) {
- ClusterService clusterService =
ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder);
+ ClusterService clusterService = ClusterServiceTestUtils.clusterService(
+ testInfo,
+ addr.port(),
+ nodeFinder,
+ new InMemoryStaleIds(),
+ clusterIdSupplier
+ );
assertThat(clusterService.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 215dbcf699..ed65976b1b 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -52,6 +52,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -60,12 +61,15 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ChannelType;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
@@ -84,6 +88,7 @@ import
org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.ConnectorKey;
import org.apache.ignite.internal.network.netty.NettySender;
import
org.apache.ignite.internal.network.netty.OutgoingAcknowledgementSilencer;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
@@ -109,6 +114,8 @@ class ItScaleCubeNetworkMessagingTest {
/** Message sent to establish a connection. */
private static final String TRAILBLAZER = "trailblazer";
+ private static final int INITIAL_PORT = 3344;
+
/**
* Test cluster.
*
@@ -1066,6 +1073,41 @@ class ItScaleCubeNetworkMessagingTest {
assertThat(sendFuture, willThrow(NodeStoppingException.class));
}
+ @Test
+ public void nodesWithDifferentClusterIdsCannotCommunicate() throws
Exception {
+ Map<NetworkAddress, ClusterIdSupplier> clusterIdSupplierMap = new
ConcurrentHashMap<>();
+
+ testCluster = new Cluster(2, testInfo, addr ->
clusterIdSupplierMap.computeIfAbsent(addr, k -> new
SameRandomClusterIdSupplier()));
+
+ assertThat(startAsync(new ComponentContext(), testCluster.members),
willCompleteSuccessfully());
+
+ assertFalse(
+ waitForCondition(testCluster::anyMembersSeeEachOther,
SECONDS.toMillis(1)),
+ "Nodes with different clusterIDs are able to communicate"
+ );
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void
nodeWithNonNullClusterIdCanCommunicateNodeWithNullClusterId(boolean nullFirst)
throws Exception {
+ Map<NetworkAddress, ClusterIdSupplier> clusterIdSupplierMap = new
ConcurrentHashMap<>();
+
+ testCluster = new Cluster(2, testInfo, addr ->
clusterIdSupplierMap.computeIfAbsent(addr, k -> {
+ if (nullFirst ^ (k.port() == INITIAL_PORT)) {
+ return ConstantClusterIdSupplier.withoutClusterId();
+ } else {
+ return new SameRandomClusterIdSupplier();
+ }
+ }));
+
+ assertThat(startAsync(new ComponentContext(), testCluster.members),
willCompleteSuccessfully());
+
+ assertTrue(
+ waitForCondition(testCluster::allMembersSeeEachOther,
SECONDS.toMillis(10)),
+ "Nodes are not able to communicate"
+ );
+ }
+
private void knockOutNode(String outcastName, boolean
closeConnectionsForcibly) throws InterruptedException {
CountDownLatch disappeared = new
CountDownLatch(testCluster.members.size() - 1);
@@ -1219,6 +1261,8 @@ class ItScaleCubeNetworkMessagingTest {
* Wrapper for a cluster.
*/
private static final class Cluster {
+ private static final ClusterIdSupplier normalClusterIdSupplier = new
SameRandomClusterIdSupplier();
+
/** Members of the cluster. */
final List<ClusterService> members;
@@ -1232,14 +1276,25 @@ class ItScaleCubeNetworkMessagingTest {
* @param testInfo Test info.
*/
Cluster(int numOfNodes, TestInfo testInfo) {
- int initialPort = 3344;
+ this(numOfNodes, testInfo, addr -> normalClusterIdSupplier);
+ }
+
+ /**
+ * Creates a test cluster with the given amount of members.
+ *
+ * @param numOfNodes Amount of cluster members.
+ * @param testInfo Test info.
+ * @param clusterIdSupplierFactory Allows to obtain a Supplier for
cluster ID by node address.
+ */
+ Cluster(int numOfNodes, TestInfo testInfo, Function<NetworkAddress,
ClusterIdSupplier> clusterIdSupplierFactory) {
+ int initialPort = INITIAL_PORT;
List<NetworkAddress> addresses = findLocalAddresses(initialPort,
initialPort + numOfNodes);
this.nodeFinder = new StaticNodeFinder(addresses);
members = addresses.stream()
- .map(addr -> startNode(testInfo, addr))
+ .map(addr -> startNode(testInfo, addr,
clusterIdSupplierFactory))
.collect(toUnmodifiableList());
}
@@ -1247,11 +1302,17 @@ class ItScaleCubeNetworkMessagingTest {
* Start cluster node.
*
* @param testInfo Test info.
- * @param addr Node address.
+ * @param addr Node address.
+ * @param clusterIdSupplierFactory Factory of cluster ID suppliers.
* @return Started cluster node.
*/
- private ClusterService startNode(TestInfo testInfo, NetworkAddress
addr) {
- return ClusterServiceTestUtils.clusterService(testInfo,
addr.port(), nodeFinder);
+ private ClusterService startNode(
+ TestInfo testInfo,
+ NetworkAddress addr,
+ Function<NetworkAddress, ClusterIdSupplier>
clusterIdSupplierFactory
+ ) {
+ ClusterIdSupplier clusterIdSupplier =
clusterIdSupplierFactory.apply(addr);
+ return ClusterServiceTestUtils.clusterService(testInfo,
addr.port(), nodeFinder, new InMemoryStaleIds(), clusterIdSupplier);
}
/**
@@ -1275,6 +1336,13 @@ class ItScaleCubeNetworkMessagingTest {
return totalMembersSeen == members.size() * members.size();
}
+ private boolean anyMembersSeeEachOther() {
+ int totalMembersSeen = members.stream()
+ .mapToInt(m -> m.topologyService().allMembers().size())
+ .sum();
+ return totalMembersSeen > members.size();
+ }
+
/**
* Stops the cluster.
*/
@@ -1299,4 +1367,13 @@ class ItScaleCubeNetworkMessagingTest {
abstract CompletableFuture<Void> send(MessagingService
messagingService, NetworkMessage message, ClusterNode recipient);
}
+
+ private static class SameRandomClusterIdSupplier implements
ClusterIdSupplier {
+ private final UUID clusterId = UUID.randomUUID();
+
+ @Override
+ public @Nullable UUID clusterId() {
+ return clusterId;
+ }
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClusterNodeMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClusterNodeMessage.java
index 4ba86b162d..88a5aca80b 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClusterNodeMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClusterNodeMessage.java
@@ -31,14 +31,19 @@ import org.apache.ignite.network.NetworkAddress;
*/
@Transferable(CLUSTER_NODE_MESSAGE)
public interface ClusterNodeMessage extends NetworkMessage, Serializable {
+ /** Node ID. */
String id();
+ /** Node name, aka consistent ID. */
String name();
+ /** Host (part of the {@link NetworkAddress} of the node. */
String host();
+ /** Port (part of the {@link NetworkAddress} of the node. */
int port();
+ /** Converts this message to the corresponding {@link ClusterNode}
instance. */
default ClusterNode asClusterNode() {
return new ClusterNodeImpl(id(), name(), new NetworkAddress(host(),
port()));
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 5a168e4efd..1f48b85e22 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.ChannelType;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.RecipientLeftException;
@@ -119,6 +120,8 @@ public class ConnectionManager implements
ChannelCreationListener {
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
+ private final ClusterIdSupplier clusterIdSupplier;
+
/** Factory producing {@link RecoveryClientHandshakeManager} instances. */
private final @Nullable RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory;
@@ -150,6 +153,8 @@ public class ConnectionManager implements
ChannelCreationListener {
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
+ * @param clusterIdSupplier Supplier of cluster ID.
+ * @param failureProcessor Used to fail the node if a critical failure
happens.
*/
public ConnectionManager(
NetworkView networkConfiguration,
@@ -157,6 +162,7 @@ public class ConnectionManager implements
ChannelCreationListener {
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier,
FailureProcessor failureProcessor
) {
this(
@@ -165,6 +171,7 @@ public class ConnectionManager implements
ChannelCreationListener {
consistentId,
bootstrapFactory,
staleIdDetector,
+ clusterIdSupplier,
null,
failureProcessor
);
@@ -178,7 +185,9 @@ public class ConnectionManager implements
ChannelCreationListener {
* @param consistentId Consistent id of this node.
* @param bootstrapFactory Bootstrap factory.
* @param staleIdDetector Detects stale member IDs.
+ * @param clusterIdSupplier Supplier of cluster ID.
* @param clientHandshakeManagerFactory Factory for {@link
RecoveryClientHandshakeManager} instances.
+ * @param failureProcessor Used to fail the node if a critical failure
happens.
*/
public ConnectionManager(
NetworkView networkConfiguration,
@@ -186,6 +195,7 @@ public class ConnectionManager implements
ChannelCreationListener {
String consistentId,
NettyBootstrapFactory bootstrapFactory,
StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier,
@Nullable RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactory,
FailureProcessor failureProcessor
) {
@@ -193,6 +203,7 @@ public class ConnectionManager implements
ChannelCreationListener {
this.consistentId = consistentId;
this.bootstrapFactory = bootstrapFactory;
this.staleIdDetector = staleIdDetector;
+ this.clusterIdSupplier = clusterIdSupplier;
this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
this.networkConfiguration = networkConfiguration;
this.failureProcessor = failureProcessor;
@@ -495,6 +506,7 @@ public class ConnectionManager implements
ChannelCreationListener {
descriptorProvider,
bootstrapFactory,
staleIdDetector,
+ clusterIdSupplier,
this,
stopping::get,
failureProcessor
@@ -518,6 +530,7 @@ public class ConnectionManager implements
ChannelCreationListener {
descriptorProvider,
bootstrapFactory,
staleIdDetector,
+ clusterIdSupplier,
this,
stopping::get,
failureProcessor
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 080f21c01f..f86ff3e701 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
@@ -60,6 +61,7 @@ import
org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage
import
org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.internal.network.recovery.message.ProbeMessage;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -81,6 +83,8 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
+ private final ClusterIdSupplier clusterIdSupplier;
+
private final BooleanSupplier stopping;
/** Connection id. */
@@ -127,6 +131,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
RecoveryDescriptorProvider recoveryDescriptorProvider,
ChannelEventLoopsSource channelEventLoopsSource,
StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping,
FailureProcessor failureProcessor
@@ -136,6 +141,7 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.channelEventLoopsSource = channelEventLoopsSource;
this.staleIdDetector = staleIdDetector;
+ this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
this.failureProcessor = failureProcessor;
@@ -311,6 +317,12 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
return true;
}
+ if (clusterIdMismatch(message.serverClusterId(),
clusterIdSupplier.clusterId())) {
+ handleClusterIdMismatch(message);
+
+ return true;
+ }
+
if (stopping.getAsBoolean()) {
handleRefusalToEstablishConnectionDueToStopping(message);
@@ -327,6 +339,10 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
);
}
+ private static boolean clusterIdMismatch(@Nullable UUID serverClusterId,
@Nullable UUID clientClusterId) {
+ return serverClusterId != null && clientClusterId != null &&
!serverClusterId.equals(clientClusterId);
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
String message = msg.serverNode().name() + ":" + msg.serverNode().id()
+ " is stale, server should be restarted so that clients can
connect";
@@ -334,6 +350,14 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
sendRejectionMessageAndFailHandshake(message,
HandshakeRejectionReason.STALE_LAUNCH_ID, HandshakeException::new);
}
+ private void handleClusterIdMismatch(HandshakeStartMessage msg) {
+ String message = msg.serverNode().name() + ":" + msg.serverNode().id()
+ + " belongs to cluster " + msg.serverClusterId() + " which is
different from this one " + clusterIdSupplier.clusterId()
+ + ", connection rejected; should CMG/MG repair be finished?";
+
+ sendRejectionMessageAndFailHandshake(message,
HandshakeRejectionReason.CLUSTER_ID_MISMATCH, HandshakeException::new);
+ }
+
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
String message = msg.serverNode().name() + ":" + msg.serverNode().id()
+ " tried to establish a connection with " + localNode.name()
+ ", but it's stopping";
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 8654a6eddb..c7a558dee0 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.failure.FailureType;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
@@ -99,6 +100,8 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
/** Used to detect that a peer uses a stale ID. */
private final StaleIdDetector staleIdDetector;
+ private final ClusterIdSupplier clusterIdSupplier;
+
private final BooleanSupplier stopping;
/** Recovery descriptor. */
@@ -120,6 +123,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
RecoveryDescriptorProvider recoveryDescriptorProvider,
ChannelEventLoopsSource channelEventLoopsSource,
StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier,
ChannelCreationListener channelCreationListener,
BooleanSupplier stopping,
FailureProcessor failureProcessor
@@ -129,6 +133,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
this.recoveryDescriptorProvider = recoveryDescriptorProvider;
this.channelEventLoopsSource = channelEventLoopsSource;
this.staleIdDetector = staleIdDetector;
+ this.clusterIdSupplier = clusterIdSupplier;
this.stopping = stopping;
this.failureProcessor = failureProcessor;
@@ -166,6 +171,7 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
public void onConnectionOpen() {
HandshakeStartMessage handshakeStartMessage =
messageFactory.handshakeStartMessage()
.serverNode(HandshakeManagerUtils.clusterNodeToMessage(localNode))
+ .serverClusterId(clusterIdSupplier.clusterId())
.build();
ChannelFuture sendFuture = channel.writeAndFlush(new
OutNetworkObject(handshakeStartMessage, emptyList(), false));
@@ -341,19 +347,17 @@ public class RecoveryServerHandshakeManager implements
HandshakeManager {
}
private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
- boolean ignorable = stopping.getAsBoolean() ||
!msg.reason().critical();
-
- if (ignorable) {
- LOG.debug("Handshake rejected by client: {}", msg.message());
- } else {
+ if (!stopping.getAsBoolean() && msg.reason().logAsWarn()) {
LOG.warn("Handshake rejected by client: {}", msg.message());
+ } else {
+ LOG.debug("Handshake rejected by client: {}", msg.message());
}
HandshakeException err = new HandshakeException(msg.message());
handshakeCompleteFuture.completeExceptionally(err);
- if (!ignorable) {
+ if (!stopping.getAsBoolean() && msg.reason().critical()) {
failureProcessor.process(new
FailureContext(FailureType.CRITICAL_ERROR, err));
}
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
index b70fea80aa..aabd08b92a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java
@@ -30,7 +30,11 @@ public enum HandshakeRejectionReason {
*/
STALE_LAUNCH_ID,
/** The sender has detected a clinch and decided to terminate this
handshake in favor of the competitor. */
- CLINCH;
+ CLINCH,
+ /**
+ * Cluster ID of the sender does not match the cluster ID of the
counterpart.
+ */
+ CLUSTER_ID_MISMATCH;
/**
* Returns {@code true} iff the rejection is not expected and should be
treated as a critical failure (requiring
@@ -39,4 +43,11 @@ public enum HandshakeRejectionReason {
public boolean critical() {
return this == STALE_LAUNCH_ID;
}
+
+ /**
+ * Returns {@code true} iff the rejection should be logged at a WARN level.
+ */
+ public boolean logAsWarn() {
+ return this == STALE_LAUNCH_ID || this == CLUSTER_ID_MISMATCH;
+ }
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
index b88c2c3099..75108a2406 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.network.recovery.message;
+import java.util.UUID;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.annotations.Transferable;
import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import org.jetbrains.annotations.Nullable;
/**
* Handshake start message, contains info about the node.
@@ -29,4 +31,8 @@ import
org.apache.ignite.internal.network.message.ClusterNodeMessage;
public interface HandshakeStartMessage extends InternalMessage {
/** Returns the server node that sends this. */
ClusterNodeMessage serverNode();
+
+ /** ID of the cluster to which the server node belongs ({@code null} if
it's not initialized yet. */
+ @Nullable
+ UUID serverClusterId();
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
index 92d90576f0..316676e956 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.AbstractClusterService;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
@@ -86,6 +87,7 @@ public class ScaleCubeClusterServiceFactory {
* @param nettyBootstrapFactory Bootstrap factory.
* @param serializationRegistry Registry used for serialization.
* @param staleIds Used to update/detect whether a node has left the
physical topology.
+ * @param clusterIdSupplier Supplier for cluster ID.
* @param criticalWorkerRegistry Used to register critical threads managed
by the new service and its components.
* @param failureProcessor Failure processor that is used to handle
critical errors.
* @return New cluster service.
@@ -96,6 +98,7 @@ public class ScaleCubeClusterServiceFactory {
NettyBootstrapFactory nettyBootstrapFactory,
MessageSerializationRegistry serializationRegistry,
StaleIds staleIds,
+ ClusterIdSupplier clusterIdSupplier,
CriticalWorkerRegistry criticalWorkerRegistry,
FailureProcessor failureProcessor
) {
@@ -146,6 +149,7 @@ public class ScaleCubeClusterServiceFactory {
consistentId,
nettyBootstrapFactory,
staleIds,
+ clusterIdSupplier,
failureProcessor
);
this.connectionMgr = connectionMgr;
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
index bbdbd49f53..c808efed53 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/DefaultMessagingServiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network;
+import static java.util.UUID.randomUUID;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -117,17 +118,19 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
private final MessageSerializationRegistry messageSerializationRegistry =
defaultSerializationRegistry();
private final ClusterNode senderNode = new ClusterNodeImpl(
- UUID.randomUUID().toString(),
+ randomUUID().toString(),
"sender",
new NetworkAddress("localhost", SENDER_PORT)
);
private final ClusterNode receiverNode = new ClusterNodeImpl(
- UUID.randomUUID().toString(),
+ randomUUID().toString(),
"receiver",
new NetworkAddress("localhost", RECEIVER_PORT)
);
+ private final UUID clusterId = randomUUID();
+
@BeforeEach
void setUp() {
lenient().when(topologyService.getByConsistentId(eq(senderNode.name()))).thenReturn(senderNode);
@@ -475,6 +478,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
MessageSerializationRegistry registry
) {
StaleIdDetector staleIdDetector = new AllIdsAreFresh();
+ ClusterIdSupplier clusterIdSupplier = new
ConstantClusterIdSupplier(clusterId);
ClassDescriptorRegistry classDescriptorRegistry = new
ClassDescriptorRegistry();
ClassDescriptorFactory classDescriptorFactory = new
ClassDescriptorFactory(classDescriptorRegistry);
@@ -506,7 +510,8 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
node.name(),
bootstrapFactory,
staleIdDetector,
- clientHandshakeManagerFactoryAdding(beforeHandshake,
bootstrapFactory, staleIdDetector),
+ clusterIdSupplier,
+ clientHandshakeManagerFactoryAdding(beforeHandshake,
bootstrapFactory, staleIdDetector, clusterIdSupplier),
failureProcessor
);
connectionManager.start();
@@ -520,7 +525,8 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
private RecoveryClientHandshakeManagerFactory
clientHandshakeManagerFactoryAdding(
Runnable beforeHandshake,
NettyBootstrapFactory bootstrapFactory,
- StaleIdDetector staleIdDetector
+ StaleIdDetector staleIdDetector,
+ ClusterIdSupplier clusterIdSupplier
) {
return new RecoveryClientHandshakeManagerFactory() {
@Override
@@ -535,6 +541,7 @@ class DefaultMessagingServiceTest extends
BaseIgniteAbstractTest {
recoveryDescriptorProvider,
bootstrapFactory,
staleIdDetector,
+ clusterIdSupplier,
channel -> {},
() -> false,
failureProcessor
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
index fd97affbb1..25d0af1cad 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -37,7 +37,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
@@ -88,6 +90,8 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
/** Test message factory. */
private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new
TestMessagesFactory();
+ private final ClusterIdSupplier clusterIdSupplier = new
ConstantClusterIdSupplier(UUID.randomUUID());
+
@Test
public void testHandshake() throws Exception {
RecoveryDescriptorProvider clientRecovery =
createRecoveryDescriptorProvider();
@@ -715,6 +719,7 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
provider,
() -> List.of(clientSideChannel.eventLoop()),
staleIdDetector,
+ clusterIdSupplier,
channel -> {},
() -> false,
mock(FailureProcessor.class)
@@ -750,6 +755,7 @@ public class RecoveryHandshakeTest extends
BaseIgniteAbstractTest {
provider,
() -> List.of(serverSideChannel.eventLoop()),
staleIdDetector,
+ clusterIdSupplier,
channel -> {},
() -> false,
mock(FailureProcessor.class)
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
index 260efc75c9..6ea14966bb 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManagerTest.java
@@ -50,6 +50,7 @@ import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import
org.apache.ignite.internal.network.handshake.ChannelAlreadyExistsException;
@@ -90,6 +91,9 @@ class RecoveryClientHandshakeManagerTest extends
BaseIgniteAbstractTest {
private static final NetworkMessagesFactory MESSAGE_FACTORY = new
NetworkMessagesFactory();
+ private static final UUID CORRECT_CLUSTER_ID = new UUID(11, 12);
+ private static final UUID WRONG_CLUSTER_ID = new UUID(13, 14);
+
@Mock
private Channel thisChannel;
@Mock
@@ -188,6 +192,7 @@ class RecoveryClientHandshakeManagerTest extends
BaseIgniteAbstractTest {
recoveryDescriptorProvider,
() -> List.of(thisChannel.eventLoop()),
new AllIdsAreFresh(),
+ new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,
stopping,
failureProcessor
@@ -199,6 +204,10 @@ class RecoveryClientHandshakeManagerTest extends
BaseIgniteAbstractTest {
}
private static HandshakeStartMessage handshakeStartMessageFrom(UUID
serverLaunchId) {
+ return handshakeStartMessageFrom(serverLaunchId, CORRECT_CLUSTER_ID);
+ }
+
+ private static HandshakeStartMessage handshakeStartMessageFrom(UUID
serverLaunchId, UUID serverClusterId) {
return MESSAGE_FACTORY.handshakeStartMessage()
.serverNode(
MESSAGE_FACTORY.clusterNodeMessage()
@@ -208,6 +217,7 @@ class RecoveryClientHandshakeManagerTest extends
BaseIgniteAbstractTest {
.port(PORT)
.build()
)
+ .serverClusterId(serverClusterId)
.build();
}
@@ -301,4 +311,20 @@ class RecoveryClientHandshakeManagerTest extends
BaseIgniteAbstractTest {
assertThat(recoveryDescriptor.holder(), is(nullValue()));
}
+
+ @Test
+ void failsHandshakeIfServerClusterIdDiffersFromOurs() {
+ RecoveryClientHandshakeManager manager =
clientHandshakeManager(LOWER_ID);
+ CompletableFuture<NettySender> localHandshakeFuture =
manager.localHandshakeFuture();
+ CompletionStage<NettySender> finalHandshakeFuture =
manager.finalHandshakeFuture();
+
+ manager.onMessage(handshakeStartMessageFrom(HIGHER_ID,
WRONG_CLUSTER_ID));
+
+
assertHandshakeRejectedMessageIsSentWithReason(HandshakeRejectionReason.CLUSTER_ID_MISMATCH);
+
+ assertThat(localHandshakeFuture, willThrow(HandshakeException.class));
+ assertThat(finalHandshakeFuture.toCompletableFuture(),
willThrow(HandshakeException.class));
+
+ assertThat(recoveryDescriptor.holder(), is(nullValue()));
+ }
}
diff --git
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
index 088faf898b..a63f8fcd43 100644
---
a/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
+++
b/modules/network/src/test/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManagerTest.java
@@ -49,6 +49,7 @@ import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.network.ClusterNodeImpl;
+import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.OutNetworkObject;
import org.apache.ignite.internal.network.handshake.HandshakeException;
@@ -85,6 +86,9 @@ class RecoveryServerHandshakeManagerTest extends
BaseIgniteAbstractTest {
private static final NetworkMessagesFactory MESSAGE_FACTORY = new
NetworkMessagesFactory();
+ private static final UUID CORRECT_CLUSTER_ID = new UUID(11, 12);
+ private static final UUID WRONG_CLUSTER_ID = new UUID(13, 14);
+
@Mock
private Channel channel;
@@ -179,6 +183,7 @@ class RecoveryServerHandshakeManagerTest extends
BaseIgniteAbstractTest {
recoveryDescriptorProvider,
() -> List.of(channel.eventLoop()),
new AllIdsAreFresh(),
+ new ConstantClusterIdSupplier(CORRECT_CLUSTER_ID),
channelCreationListener,
stopping,
failureProcessor
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/ConstantClusterIdSupplier.java
similarity index 53%
copy from
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to
modules/network/src/testFixtures/java/org/apache/ignite/internal/network/ConstantClusterIdSupplier.java
index b88c2c3099..2c362b5598 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/ConstantClusterIdSupplier.java
@@ -15,18 +15,31 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.network;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.network.message.ClusterNodeMessage;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
/**
- * Handshake start message, contains info about the node.
- * This message is sent from a server to a client at the connection opening.
+ * Always returns the same cluster ID fixed at construction.
*/
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends InternalMessage {
- /** Returns the server node that sends this. */
- ClusterNodeMessage serverNode();
+public class ConstantClusterIdSupplier implements ClusterIdSupplier {
+ @Nullable
+ private final UUID clusterId;
+
+ /**
+ * Returns a supplier that never has any cluster ID (always returns {@code
null}).
+ */
+ public static ClusterIdSupplier withoutClusterId() {
+ return new ConstantClusterIdSupplier(null);
+ }
+
+ public ConstantClusterIdSupplier(@Nullable UUID clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ @Override
+ public @Nullable UUID clusterId() {
+ return clusterId;
+ }
}
diff --git
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
index 1b1fd77950..8ac06f97b1 100644
---
a/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
+++
b/modules/network/src/testFixtures/java/org/apache/ignite/internal/network/utils/ClusterServiceTestUtils.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.network.utils;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.network.ConstantClusterIdSupplier.withoutClusterId;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.handlers.NoOpFailureHandler;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.AbstractClusterService;
+import org.apache.ignite.internal.network.ClusterIdSupplier;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.MessageSerializationRegistryImpl;
import org.apache.ignite.internal.network.NettyBootstrapFactory;
@@ -97,9 +99,30 @@ public class ClusterServiceTestUtils {
* @param staleIds Used to track stale launch IDs.
*/
public static ClusterService clusterService(TestInfo testInfo, int port,
NodeFinder nodeFinder, StaleIds staleIds) {
+ return clusterService(testInfo, port, nodeFinder, staleIds,
withoutClusterId());
+ }
+
+ /**
+ * Creates a cluster service and required node configuration manager
beneath it. Populates node configuration with specified port.
+ * Manages configuration manager lifecycle: on cluster service start
starts node configuration manager, on cluster service stop - stops
+ * node configuration manager.
+ *
+ * @param testInfo Test info.
+ * @param port Local port.
+ * @param nodeFinder Node finder.
+ * @param staleIds Used to track stale launch IDs.
+ * @param clusterIdSupplier Supplier of cluster ID.
+ */
+ public static ClusterService clusterService(
+ TestInfo testInfo,
+ int port,
+ NodeFinder nodeFinder,
+ StaleIds staleIds,
+ ClusterIdSupplier clusterIdSupplier
+ ) {
String nodeName = testNodeName(testInfo, port);
- return clusterService(nodeName, port, nodeFinder, staleIds);
+ return clusterService(nodeName, port, nodeFinder, staleIds,
clusterIdSupplier);
}
/**
@@ -111,7 +134,7 @@ public class ClusterServiceTestUtils {
* @return Cluster service instance.
*/
public static ClusterService clusterService(String nodeName, int port,
NodeFinder nodeFinder) {
- return clusterService(nodeName, port, nodeFinder, new
InMemoryStaleIds());
+ return clusterService(nodeName, port, nodeFinder, new
InMemoryStaleIds(), withoutClusterId());
}
/**
@@ -121,9 +144,16 @@ public class ClusterServiceTestUtils {
* @param port Local port.
* @param nodeFinder Node finder.
* @param staleIds Used to track stale launch IDs.
+ * @param clusterIdSupplier Supplier of cluster ID.
* @return Cluster service instance.
*/
- private static ClusterService clusterService(String nodeName, int port,
NodeFinder nodeFinder, StaleIds staleIds) {
+ private static ClusterService clusterService(
+ String nodeName,
+ int port,
+ NodeFinder nodeFinder,
+ StaleIds staleIds,
+ ClusterIdSupplier clusterIdSupplier
+ ) {
ConfigurationManager nodeConfigurationMgr = new ConfigurationManager(
Collections.singleton(NetworkConfiguration.KEY),
new TestConfigurationStorage(ConfigurationType.LOCAL),
@@ -143,6 +173,7 @@ public class ClusterServiceTestUtils {
bootstrapFactory,
serializationRegistry,
staleIds,
+ clusterIdSupplier,
new NoOpCriticalWorkerRegistry(),
new FailureProcessor(new NoOpFailureHandler())
);
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index e9e8b15460..0a960b7f10 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -125,6 +126,7 @@ import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
import
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryProfileConfigurationSchema;
@@ -907,10 +909,14 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
new TestConfigurationValidator()
);
+ var clusterIdHolder = new ClusterIdHolder();
+
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
networkAddress.port(),
- finder
+ finder,
+ new InMemoryStaleIds(),
+ clusterIdHolder
);
lockManager = new HeapLockManager();
@@ -951,7 +957,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
logicalTopology,
clusterManagementConfiguration,
new
NodeAttributesCollector(nodeAttributesConfigurations.get(idx),
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ clusterIdHolder
);
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
index 85a13338a3..e4416e34f6 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItTruncateSuffixAndRestartTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.raft;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.IntStream.range;
+import static
org.apache.ignite.internal.network.ConstantClusterIdSupplier.withoutClusterId;
import static
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema.DEFAULT_PORT;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static
org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
@@ -180,8 +181,10 @@ public class ItTruncateSuffixAndRestartTest extends
BaseIgniteAbstractTest {
nettyBootstrapFactory,
defaultSerializationRegistry(),
new InMemoryStaleIds(),
+ withoutClusterId(),
new NoOpCriticalWorkerRegistry(),
- mock(FailureProcessor.class));
+ mock(FailureProcessor.class)
+ );
assertThat(clusterSvc.startAsync(new ComponentContext()),
willCompleteSuccessfully());
cleanup.add(() -> assertThat(clusterSvc.stopAsync(new
ComponentContext()), willCompleteSuccessfully()));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index a2ad98eb88..f8290bffeb 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -189,7 +190,8 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ new ClusterIdHolder()
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 4ca6f6c31d..238e296b81 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -32,6 +32,7 @@ import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -161,7 +162,8 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ new ClusterIdHolder()
);
var logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index e99441fae7..b5de12d872 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -96,6 +96,7 @@ import
org.apache.ignite.internal.catalog.commands.CreateTableCommand;
import org.apache.ignite.internal.catalog.commands.TableHashPrimaryKey;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.cluster.management.ClusterIdService;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -328,6 +329,10 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
VaultManager vault = createVault(dir);
+ var clusterStateStorage = new
RocksDbClusterStateStorage(dir.resolve("cmg"), name);
+
+ var clusterIdService = new ClusterIdService(clusterStateStorage);
+
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
Path configFile =
workDir.resolve(TestIgnitionManager.DEFAULT_CONFIG_NAME);
@@ -374,6 +379,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
nettyBootstrapFactory,
defaultSerializationRegistry(),
new VaultStaleIds(vault),
+ clusterIdService,
workerRegistry,
failureProcessor
);
@@ -395,8 +401,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
raftGroupEventsClientListener
);
- var clusterStateStorage = new
RocksDbClusterStateStorage(dir.resolve("cmg"), name);
-
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
@@ -415,7 +419,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
nodeCfgMgr.configurationRegistry().getConfiguration(StorageConfiguration.KEY)),
- failureProcessor
+ failureProcessor,
+ clusterIdService
);
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier
@@ -747,11 +752,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
threadPoolsManager,
failureProcessor,
workerRegistry,
+ clusterStateStorage,
+ clusterIdService,
nettyBootstrapFactory,
nettyWorkersRegistrar,
clusterSvc,
raftMgr,
- clusterStateStorage,
cmgManager,
replicaMgr,
txManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 949ebf20d5..7ec71431aa 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -68,6 +68,7 @@ import
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.cluster.management.ClusterIdService;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
@@ -279,6 +280,8 @@ public class IgniteImpl implements Ignite {
/** Configuration manager that handles node (local) configuration. */
private final ConfigurationManager nodeCfgMgr;
+ private final ClusterIdService clusterIdService;
+
/** Cluster service (cluster network manager). */
private final ClusterService clusterSvc;
@@ -429,6 +432,13 @@ public class IgniteImpl implements Ignite {
metricManager = new MetricManagerImpl();
+ // TODO: IGNITE-16841 - use common RocksDB instance to store cluster
state as well.
+ Path cmgDbPath = cmgDbPath(workDir);
+
+ clusterStateStorage = new RocksDbClusterStateStorage(cmgDbPath, name);
+
+ clusterIdService = new ClusterIdService(clusterStateStorage);
+
ConfigurationModules modules =
loadConfigurationModules(serviceProviderClassLoader);
ConfigurationTreeGenerator localConfigurationGenerator = new
ConfigurationTreeGenerator(
@@ -485,6 +495,7 @@ public class IgniteImpl implements Ignite {
nettyBootstrapFactory,
serializationRegistry,
new VaultStaleIds(vaultMgr),
+ clusterIdService,
criticalWorkerRegistry,
failureProcessor
);
@@ -523,11 +534,6 @@ public class IgniteImpl implements Ignite {
message -> threadPoolsManager.partitionOperationsExecutor()
);
- // TODO: IGNITE-16841 - use common RocksDB instance to store cluster
state as well.
- Path cmgDbPath = cmgDbPath(workDir);
-
- clusterStateStorage = new RocksDbClusterStateStorage(cmgDbPath, name);
-
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
ConfigurationTreeGenerator distributedConfigurationGenerator = new
ConfigurationTreeGenerator(
@@ -571,7 +577,8 @@ public class IgniteImpl implements Ignite {
validationManager,
nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY),
nodeAttributesCollector,
- failureProcessor
+ failureProcessor,
+ clusterIdService
);
logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgMgr);
@@ -1104,6 +1111,8 @@ public class IgniteImpl implements Ignite {
threadPoolsManager,
clockWaiter,
failureProcessor,
+ clusterStateStorage,
+ clusterIdService,
criticalWorkerRegistry,
nettyBootstrapFactory,
nettyWorkersRegistrar,
@@ -1111,7 +1120,6 @@ public class IgniteImpl implements Ignite {
restComponent,
logStorageFactory,
raftMgr,
- clusterStateStorage,
cmgMgr,
lowWatermark
).thenRun(() -> {
@@ -1642,6 +1650,11 @@ public class IgniteImpl implements Ignite {
return clusterSvc;
}
+ @TestOnly
+ public ClusterIdService clusterIdService() {
+ return clusterIdService;
+ }
+
/** Returns resources registry. */
@TestOnly
public RemotelyTriggeredResourceRegistry resourcesRegistry() {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 718d77b6b6..4b68246985 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -101,6 +101,7 @@ import
org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.NodeAttributesCollector;
@@ -153,6 +154,7 @@ import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
+import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryProfileConfigurationSchema;
import
org.apache.ignite.internal.pagememory.configuration.schema.VolatilePageMemoryProfileConfigurationSchema;
@@ -1094,6 +1096,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
vaultManager = createVault(dir);
+ var clusterIdService = new ClusterIdHolder();
+
nodeCfgGenerator = new ConfigurationTreeGenerator(
List.of(
NetworkConfiguration.KEY,
@@ -1126,7 +1130,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
addr.port(),
- finder
+ finder,
+ new InMemoryStaleIds(),
+ clusterIdService
);
lockManager = new HeapLockManager();
@@ -1169,7 +1175,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
logicalTopology,
clusterManagementConfiguration,
new NodeAttributesCollector(nodeAttributes,
storageConfiguration),
- failureProcessor
+ failureProcessor,
+ clusterIdService
);
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);