This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e25398cb1c IGNITE-23223 Obtain clusterId from cluster state in
LogicalTopology (#4410)
e25398cb1c is described below
commit e25398cb1c411ed0f304baa0fa233ef84d1dcd0e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Sep 18 17:55:23 2024 +0400
IGNITE-23223 Obtain clusterId from cluster state in LogicalTopology (#4410)
---
.../management/raft/ItCmgRaftServiceTest.java | 7 ++---
.../management/ClusterManagementGroupManager.java | 7 ++---
.../management/raft/CmgRaftGroupListener.java | 10 +++++++-
.../management/topology/LogicalTopologyImpl.java | 30 +++++++++++++++++-----
.../management/raft/CmgRaftGroupListenerTest.java | 26 ++++++++++++++++---
.../topology/LogicalTopologyImplTest.java | 5 ++--
.../internal/cluster/management/MockNode.java | 3 +--
.../management/raft/TestClusterStateStorage.java | 26 +++++++++++++++++++
...niteDistributionZoneManagerNodeRestartTest.java | 6 ++---
.../BaseDistributionZoneManagerTest.java | 5 ++--
.../ItMetaStorageMultipleNodesAbstractTest.java | 3 +--
.../metastorage/impl/ItMetaStorageWatchTest.java | 3 +--
.../replicator/ItReplicaLifecycleTest.java | 3 +--
.../ItDistributedConfigurationPropertiesTest.java | 3 +--
.../ItDistributedConfigurationStorageTest.java | 3 +--
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 11 +++++---
.../internal/disaster/system/ClusterIdService.java | 3 +++
.../system/SystemDisasterRecoveryStorage.java | 8 +++++-
.../rebalance/ItRebalanceDistributedTest.java | 2 +-
20 files changed, 118 insertions(+), 48 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 2209a20390..6f4d544269 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -40,6 +40,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.CmgGroupId;
@@ -59,7 +60,6 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.properties.IgniteProductVersion;
@@ -120,7 +120,7 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
workingDir.raftLogPath()
);
this.raftManager = TestLozaFactory.create(clusterService,
raftConfiguration, new HybridClockImpl());
- this.logicalTopology = new
LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier());
+ this.logicalTopology = new
LogicalTopologyImpl(clusterStateStorage);
}
void start() {
@@ -156,7 +156,8 @@ public class ItCmgRaftServiceTest extends
BaseIgniteAbstractTest {
clusterStateStorageMgr,
logicalTopology,
new
ValidationManager(clusterStateStorageMgr, logicalTopology),
- term -> {}
+ term -> {},
+ new ClusterIdHolder()
),
RaftGroupEventsListener.noopLsnr,
RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory,
workingDir.metaPath())
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 08ca0d36c9..92d694a38f 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -528,8 +528,6 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
- clusterIdStore.clusterId(state.clusterTag().clusterId());
-
return joinCluster(service, state.clusterTag());
});
}
@@ -785,7 +783,8 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
clusterStateStorageMgr,
logicalTopology,
validationManager,
- this::onLogicalTopologyChanged
+ this::onLogicalTopologyChanged,
+ clusterIdStore
),
this::onElectedAsLeader,
raftGroupOptionsConfigurer
@@ -829,8 +828,6 @@ public class ClusterManagementGroupManager extends
AbstractEventProducer<Cluster
localStateStorage.saveLocalState(localState);
- clusterIdStore.clusterId(state.clusterTag().clusterId());
-
return joinCluster(service, state.clusterTag());
});
}
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index acd87cb1b4..17647d659c 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.ClusterIdStore;
import org.apache.ignite.internal.cluster.management.ClusterState;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import
org.apache.ignite.internal.cluster.management.raft.commands.ChangeMetastorageNodesCommand;
@@ -75,6 +76,8 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
private final LongConsumer onLogicalTopologyChanged;
+ private final ClusterIdStore clusterIdStore;
+
private final CmgMessagesFactory cmgMessagesFactory = new
CmgMessagesFactory();
/**
@@ -84,17 +87,20 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
* @param logicalTopology Logical topology that will be updated by this
listener.
* @param validationManager Validator for cluster nodes.
* @param onLogicalTopologyChanged Callback invoked (with the
corresponding RAFT term) when logical topology gets changed.
+ * @param clusterIdStore Used to store cluster ID when init command is
executed.
*/
public CmgRaftGroupListener(
ClusterStateStorageManager storageManager,
LogicalTopology logicalTopology,
ValidationManager validationManager,
- LongConsumer onLogicalTopologyChanged
+ LongConsumer onLogicalTopologyChanged,
+ ClusterIdStore clusterIdStore
) {
this.storageManager = storageManager;
this.logicalTopology = logicalTopology;
this.validationManager = validationManager;
this.onLogicalTopologyChanged = onLogicalTopologyChanged;
+ this.clusterIdStore = clusterIdStore;
}
@Override
@@ -199,6 +205,8 @@ public class CmgRaftGroupListener implements
RaftGroupListener {
if (state == null) {
storageManager.putClusterState(command.clusterState());
+
clusterIdStore.clusterId(command.clusterState().clusterTag().clusterId());
+
return command.clusterState();
} else {
ValidationResult validationResult =
ValidationManager.validateState(
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index ec7ab64b8b..52274ee983 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -32,13 +32,15 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
+import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import
org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.network.ClusterIdSupplier;
+import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link LogicalTopology}.
@@ -51,13 +53,17 @@ public class LogicalTopologyImpl implements LogicalTopology
{
private final ClusterStateStorage storage;
- private final ClusterIdSupplier clusterIdSupplier;
+ private final ClusterStateStorageManager clusterStateStorageManager;
private final List<LogicalTopologyEventListener> listeners = new
CopyOnWriteArrayList<>();
- public LogicalTopologyImpl(ClusterStateStorage storage, ClusterIdSupplier
clusterIdSupplier) {
+ private volatile @Nullable UUID clusterId;
+
+ /** Constructor. */
+ public LogicalTopologyImpl(ClusterStateStorage storage) {
this.storage = storage;
- this.clusterIdSupplier = clusterIdSupplier;
+
+ clusterStateStorageManager = new ClusterStateStorageManager(storage);
}
@Override
@@ -128,11 +134,21 @@ public class LogicalTopologyImpl implements
LogicalTopology {
}
private UUID requiredClusterId() {
- UUID clusterId = clusterIdSupplier.clusterId();
+ UUID localClusterId = clusterId;
+ if (localClusterId != null) {
+ return localClusterId;
+ }
+
+ // It is safe to read cluster state from the CMG storage as it was
either restored from a snapshot (and has cluster state),
+ // or init command was executed before current command and put cluster
state to the CMG storage.
+ ClusterState clusterState =
clusterStateStorageManager.getClusterState();
+ assert clusterState != null : "clusterState cannot be null when
commands are already being executed by the CMG state machine";
- assert clusterId != null : "clusterId cannot be null when commands are
already being executed by the CMG state machine";
+ // clusterId cannot have different non-null values for the same node
during the same launch, so we don't need to synchronize.
+ localClusterId = clusterState.clusterTag().clusterId();
+ clusterId = localClusterId;
- return clusterId;
+ return localClusterId;
}
private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) {
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index 6f7131ed05..aaf888b276 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.LongConsumer;
+import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
@@ -52,7 +53,6 @@ import
org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import
org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.service.CommandClosure;
@@ -77,7 +77,7 @@ public class CmgRaftGroupListenerTest extends
BaseIgniteAbstractTest {
private LongConsumer onLogicalTopologyChanged;
@Spy
- private final LogicalTopology logicalTopology = new
LogicalTopologyImpl(storage, new ConstantClusterIdSupplier());
+ private final LogicalTopology logicalTopology = new
LogicalTopologyImpl(storage);
private CmgRaftGroupListener listener;
@@ -96,6 +96,8 @@ public class CmgRaftGroupListenerTest extends
BaseIgniteAbstractTest {
private final ClusterNodeMessage node =
msgFactory.clusterNodeMessage().id("foo").name("bar").host("localhost").port(666).build();
+ private final ClusterIdHolder clusterIdHolder = new ClusterIdHolder();
+
@BeforeEach
void setUp() {
assertThat(storage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
@@ -103,7 +105,13 @@ public class CmgRaftGroupListenerTest extends
BaseIgniteAbstractTest {
var clusterStateStorageMgr = new ClusterStateStorageManager(storage);
var validationManager = new ValidationManager(clusterStateStorageMgr,
logicalTopology);
- listener = new CmgRaftGroupListener(clusterStateStorageMgr,
logicalTopology, validationManager, onLogicalTopologyChanged);
+ listener = new CmgRaftGroupListener(
+ clusterStateStorageMgr,
+ logicalTopology,
+ validationManager,
+ onLogicalTopologyChanged,
+ clusterIdHolder
+ );
}
@AfterEach
@@ -231,6 +239,18 @@ public class CmgRaftGroupListenerTest extends
BaseIgniteAbstractTest {
));
}
+ @Test
+ void initStoresClusterId() {
+ listener.onWrite(iterator(
+ msgFactory.initCmgStateCommand()
+ .clusterState(state)
+ .node(node)
+ .build()
+ ));
+
+ assertThat(clusterIdHolder.clusterId(),
is(state.clusterTag().clusterId()));
+ }
+
private static <T extends Command> Iterator<CommandClosure<T>> iterator(T
obj) {
CommandClosure<T> closure = new CommandClosure<>() {
@Override
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
index 733692c82d..38d5fee32e 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
@@ -53,7 +53,6 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -72,7 +71,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(MockitoExtension.class)
class LogicalTopologyImplTest extends BaseIgniteAbstractTest {
- private final ClusterStateStorage storage = spy(new
TestClusterStateStorage());
+ private final ClusterStateStorage storage =
spy(TestClusterStateStorage.initializedClusterStateStorage());
private LogicalTopology topology;
@@ -98,7 +97,7 @@ class LogicalTopologyImplTest extends BaseIgniteAbstractTest {
void setUp() {
assertThat(storage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
- topology = new LogicalTopologyImpl(storage, new
ConstantClusterIdSupplier());
+ topology = new LogicalTopologyImpl(storage);
topology.addEventListener(listener);
}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 6c1143fa9a..96aa9d4c61 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
@@ -133,7 +132,7 @@ public class MockNode {
new ClusterInitializer(clusterService, hocon -> hocon, new
TestConfigurationValidator()),
raftManager,
clusterStateStorage,
- new LogicalTopologyImpl(clusterStateStorage, new
ConstantClusterIdSupplier()),
+ new LogicalTopologyImpl(clusterStateStorage),
new NodeAttributesCollector(nodeAttributes,
storageProfilesConfiguration),
failureManager,
clusterIdHolder,
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
index b9a231cdb8..9c30c4aee9 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/raft/TestClusterStateStorage.java
@@ -32,6 +32,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,8 +41,11 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Supplier;
+import org.apache.ignite.internal.cluster.management.ClusterTag;
+import
org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
@@ -50,6 +55,8 @@ import org.jetbrains.annotations.Nullable;
public class TestClusterStateStorage implements ClusterStateStorage {
private static final String SNAPSHOT_FILE = "snapshot.bin";
+ private static final CmgMessagesFactory CMG_MESSAGES_FACTORY = new
CmgMessagesFactory();
+
private final Map<ByteArray, byte[]> map = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -59,6 +66,25 @@ public class TestClusterStateStorage implements
ClusterStateStorage {
/** Prevents double stopping of the component. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
+ /**
+ * Returns a {@link TestClusterStateStorage} containing cluster state.
+ */
+ public static TestClusterStateStorage initializedClusterStateStorage() {
+ TestClusterStateStorage storage = new TestClusterStateStorage();
+
+ // ClusterStateStorageManager instance is created here just to put
cluster state to the storage in the correct format.
+ new ClusterStateStorageManager(storage).putClusterState(
+ CMG_MESSAGES_FACTORY.clusterState()
+ .cmgNodes(Set.of("test"))
+ .metaStorageNodes(Set.of("test"))
+
.clusterTag(ClusterTag.clusterTag(CMG_MESSAGES_FACTORY, "cluster", new UUID(1,
1)))
+
.version(IgniteProductVersion.CURRENT_VERSION.toString())
+ .build()
+ );
+
+ return storage;
+ }
+
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return nullCompletedFuture();
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index 5db25e9a42..1a4ad52ba6 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -69,7 +69,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
@@ -186,10 +185,9 @@ public class
ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe
VaultManager vault = createVault(dir);
- var clusterStateStorage = new TestClusterStateStorage();
+ var clusterStateStorage =
TestClusterStateStorage.initializedClusterStateStorage();
var clusterIdService = new ClusterIdService(vault);
- clusterIdService.clusterId(UUID.randomUUID());
ConfigurationModules modules = loadConfigurationModules(log,
Thread.currentThread().getContextClassLoader());
@@ -230,7 +228,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
mock(FailureManager.class)
);
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
clusterIdService);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var cmgManager = mock(ClusterManagementGroupManager.class);
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
index 2bb6740f48..42bfd52d55 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.jetbrains.annotations.Nullable;
@@ -98,11 +97,11 @@ public abstract class BaseDistributionZoneManagerTest
extends BaseIgniteAbstract
components.add(metaStorageManager);
- clusterStateStorage = new TestClusterStateStorage();
+ clusterStateStorage =
TestClusterStateStorage.initializedClusterStateStorage();
components.add(clusterStateStorage);
- topology = new LogicalTopologyImpl(clusterStateStorage, new
ConstantClusterIdSupplier());
+ topology = new LogicalTopologyImpl(clusterStateStorage);
ClusterManagementGroupManager cmgManager =
mock(ClusterManagementGroupManager.class);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
index f1c29a9a5d..d1c3cd1fd6 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java
@@ -57,7 +57,6 @@ import
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
@@ -156,7 +155,7 @@ abstract class ItMetaStorageMultipleNodesAbstractTest
extends IgniteAbstractTest
raftGroupEventsClientListener
);
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
new ConstantClusterIdSupplier());
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index ea965caf33..a1ca164e3d 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -81,7 +81,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operations;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
@@ -165,7 +164,7 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
components.add(clusterStateStorage);
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
new ConstantClusterIdSupplier());
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 39a03f2ccd..7796003fd5 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -132,7 +132,6 @@ import
org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.StaticNodeFinder;
import
org.apache.ignite.internal.network.configuration.NetworkExtensionConfigurationSchema;
import org.apache.ignite.internal.network.recovery.InMemoryStaleIds;
@@ -988,7 +987,7 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
);
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
new ConstantClusterIdSupplier());
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 09d587267b..bc2f831921 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -66,7 +66,6 @@ import
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
@@ -185,7 +184,7 @@ public class ItDistributedConfigurationPropertiesTest
extends BaseIgniteAbstract
);
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
new ConstantClusterIdSupplier());
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index b37fd69385..6cde9d2c4d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -58,7 +58,6 @@ import
org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metrics.NoOpMetricManager;
import org.apache.ignite.internal.network.ClusterService;
-import org.apache.ignite.internal.network.ConstantClusterIdSupplier;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
@@ -158,7 +157,7 @@ public class ItDistributedConfigurationStorageTest extends
BaseIgniteAbstractTes
);
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
new ConstantClusterIdSupplier());
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 14c44d71d4..3ac74b9fd8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -409,7 +409,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
raftGroupEventsClientListener
);
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
clusterIdService);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterSvc,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 23b8da3dcc..b975871149 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -598,7 +598,7 @@ public class IgniteImpl implements Ignite {
RaftGroupOptionsConfigurer cmgRaftConfigurer =
RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory,
cmgWorkDir.metaPath());
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
clusterIdService);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
ConfigurationTreeGenerator distributedConfigurationGenerator = new
ConfigurationTreeGenerator(
modules.distributed().rootKeys(),
@@ -1072,7 +1072,7 @@ public class IgniteImpl implements Ignite {
compute,
clusterSvc,
nettyBootstrapFactory,
- () -> clusterInfo(systemDisasterRecoveryStorage),
+ () -> clusterInfo(clusterStateStorageMgr),
metricManager,
new ClientHandlerMetricSource(),
authenticationManager,
@@ -1095,8 +1095,11 @@ public class IgniteImpl implements Ignite {
publicCatalog = new PublicApiThreadingIgniteCatalog(new
IgniteCatalogSqlImpl(sql, distributedTblMgr), asyncContinuationExecutor);
}
- private static ClusterInfo clusterInfo(SystemDisasterRecoveryStorage
systemDisasterRecoveryStorage) {
- ClusterState clusterState =
systemDisasterRecoveryStorage.readClusterState();
+ private static ClusterInfo clusterInfo(ClusterStateStorageManager
clusterStateStorageManager) {
+ // It is safe to read cluster state from CMG state as it can only be
read when the node is initialized and fully started,
+ // and in those moments the cluster state is already available in the
CMG state storage.
+
+ ClusterState clusterState =
clusterStateStorageManager.getClusterState();
assert clusterState != null : "Cluster state cannot be null at the
moment when a client connects";
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
index 0887aa3764..731827fc04 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java
@@ -47,6 +47,9 @@ public class ClusterIdService implements ClusterIdSupplier,
ClusterIdStore, Igni
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ // Reading from the SystemDisasterRecoveryStorage and not from
ClusterStateStorage (used by the CMG) because the latter is recreated
+ // on each start and there could be moments during start when an
initialized node does not have cluster state in
+ // the ClusterStateStorage.
ClusterState clusterState = storage.readClusterState();
if (clusterState != null) {
clusterId(clusterState.clusterTag().clusterId());
diff --git
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
index 1462bbe296..eda3af1522 100644
---
a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
+++
b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java
@@ -53,7 +53,13 @@ public class SystemDisasterRecoveryStorage implements
ClusterResetStorage {
vault.remove(RESET_CLUSTER_MESSAGE_VAULT_KEY);
}
- /** Reads cluster state. */
+ /**
+ * Reads cluster state from the Vault. This is used for cases when it may
be needed to read it during node startup (and the usual
+ * CMG state storage might be empty at those moments).
+ *
+ * @return Cluster state saved to the Vault or {@code null} if it was not
saved yet (which means that the node has never joined
+ * the cluster yet).
+ */
public @Nullable ClusterState readClusterState() {
return readFromVault(CLUSTER_STATE_VAULT_KEY);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9faadad4e0..55b617ff3a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1175,7 +1175,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
));
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopology = new LogicalTopologyImpl(clusterStateStorage,
clusterIdService);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var clusterInitializer = new ClusterInitializer(
clusterService,