This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch ignite-23223 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ac15af07a5e18692d0d68e9cbc2cb7b7a4007fec Author: Roman Puchkovskiy <[email protected]> AuthorDate: Wed Sep 18 11:11:39 2024 +0400 IGNITE-23223 Obtain clusterId from cluster state in LogicalTopology --- .../management/raft/ItCmgRaftServiceTest.java | 7 ++--- .../management/ClusterManagementGroupManager.java | 7 ++--- .../management/raft/CmgRaftGroupListener.java | 10 +++++++- .../management/topology/LogicalTopologyImpl.java | 30 +++++++++++++++++----- .../management/raft/CmgRaftGroupListenerTest.java | 26 ++++++++++++++++--- .../topology/LogicalTopologyImplTest.java | 3 +-- .../internal/cluster/management/MockNode.java | 3 +-- ...niteDistributionZoneManagerNodeRestartTest.java | 2 +- .../BaseDistributionZoneManagerTest.java | 19 ++++++++++++-- .../ItMetaStorageMultipleNodesAbstractTest.java | 3 +-- .../metastorage/impl/ItMetaStorageWatchTest.java | 3 +-- .../replicator/ItReplicaLifecycleTest.java | 3 +-- .../ItDistributedConfigurationPropertiesTest.java | 3 +-- .../ItDistributedConfigurationStorageTest.java | 3 +-- .../runner/app/ItIgniteNodeRestartTest.java | 2 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 11 +++++--- .../internal/disaster/system/ClusterIdService.java | 3 +++ .../system/SystemDisasterRecoveryStorage.java | 5 +++- .../rebalance/ItRebalanceDistributedTest.java | 2 +- 19 files changed, 102 insertions(+), 43 deletions(-) diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java index 2209a20390..6f4d544269 100644 --- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java +++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.cluster.management.ClusterIdHolder; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.ClusterTag; import org.apache.ignite.internal.cluster.management.CmgGroupId; @@ -59,7 +60,6 @@ import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.NodeFinder; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.properties.IgniteProductVersion; @@ -120,7 +120,7 @@ public class ItCmgRaftServiceTest extends BaseIgniteAbstractTest { workingDir.raftLogPath() ); this.raftManager = TestLozaFactory.create(clusterService, raftConfiguration, new HybridClockImpl()); - this.logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + this.logicalTopology = new LogicalTopologyImpl(clusterStateStorage); } void start() { @@ -156,7 +156,8 @@ public class ItCmgRaftServiceTest extends BaseIgniteAbstractTest { clusterStateStorageMgr, logicalTopology, new ValidationManager(clusterStateStorageMgr, logicalTopology), - term -> {} + term -> {}, + new ClusterIdHolder() ), RaftGroupEventsListener.noopLsnr, RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageFactory, workingDir.metaPath()) diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index a456fe8bf3..70d5509959 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -528,8 +528,6 @@ public class ClusterManagementGroupManager extends AbstractEventProducer<Cluster localStateStorage.saveLocalState(localState); - clusterIdStore.clusterId(state.clusterTag().clusterId()); - return joinCluster(service, state.clusterTag()); }); } @@ -785,7 +783,8 @@ public class ClusterManagementGroupManager extends AbstractEventProducer<Cluster clusterStateStorageMgr, logicalTopology, validationManager, - this::onLogicalTopologyChanged + this::onLogicalTopologyChanged, + clusterIdStore ), this::onElectedAsLeader, raftGroupOptionsConfigurer @@ -829,8 +828,6 @@ public class ClusterManagementGroupManager extends AbstractEventProducer<Cluster localStateStorage.saveLocalState(localState); - clusterIdStore.clusterId(state.clusterTag().clusterId()); - return joinCluster(service, state.clusterTag()); }); } diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java index acd87cb1b4..17647d659c 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.stream.Collectors; +import org.apache.ignite.internal.cluster.management.ClusterIdStore; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; import org.apache.ignite.internal.cluster.management.raft.commands.ChangeMetastorageNodesCommand; @@ -75,6 +76,8 @@ public class CmgRaftGroupListener implements RaftGroupListener { private final LongConsumer onLogicalTopologyChanged; + private final ClusterIdStore clusterIdStore; + private final CmgMessagesFactory cmgMessagesFactory = new CmgMessagesFactory(); /** @@ -84,17 +87,20 @@ public class CmgRaftGroupListener implements RaftGroupListener { * @param logicalTopology Logical topology that will be updated by this listener. * @param validationManager Validator for cluster nodes. * @param onLogicalTopologyChanged Callback invoked (with the corresponding RAFT term) when logical topology gets changed. + * @param clusterIdStore Used to store cluster ID when init command is executed. */ public CmgRaftGroupListener( ClusterStateStorageManager storageManager, LogicalTopology logicalTopology, ValidationManager validationManager, - LongConsumer onLogicalTopologyChanged + LongConsumer onLogicalTopologyChanged, + ClusterIdStore clusterIdStore ) { this.storageManager = storageManager; this.logicalTopology = logicalTopology; this.validationManager = validationManager; this.onLogicalTopologyChanged = onLogicalTopologyChanged; + this.clusterIdStore = clusterIdStore; } @Override @@ -199,6 +205,8 @@ public class CmgRaftGroupListener implements RaftGroupListener { if (state == null) { storageManager.putClusterState(command.clusterState()); + clusterIdStore.clusterId(command.clusterState().clusterTag().clusterId()); + return command.clusterState(); } else { ValidationResult validationResult = ValidationManager.validateState( diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java index ec7ab64b8b..52274ee983 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java @@ -32,13 +32,15 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; +import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.network.ClusterIdSupplier; +import org.jetbrains.annotations.Nullable; /** * Implementation of {@link LogicalTopology}. @@ -51,13 +53,17 @@ public class LogicalTopologyImpl implements LogicalTopology { private final ClusterStateStorage storage; - private final ClusterIdSupplier clusterIdSupplier; + private final ClusterStateStorageManager clusterStateStorageManager; private final List<LogicalTopologyEventListener> listeners = new CopyOnWriteArrayList<>(); - public LogicalTopologyImpl(ClusterStateStorage storage, ClusterIdSupplier clusterIdSupplier) { + private volatile @Nullable UUID clusterId; + + /** Constructor. */ + public LogicalTopologyImpl(ClusterStateStorage storage) { this.storage = storage; - this.clusterIdSupplier = clusterIdSupplier; + + clusterStateStorageManager = new ClusterStateStorageManager(storage); } @Override @@ -128,11 +134,21 @@ public class LogicalTopologyImpl implements LogicalTopology { } private UUID requiredClusterId() { - UUID clusterId = clusterIdSupplier.clusterId(); + UUID localClusterId = clusterId; + if (localClusterId != null) { + return localClusterId; + } + + // It is safe to read cluster state from the CMG storage as it was either restored from a snapshot (and has cluster state), + // or init command was executed before current command and put cluster state to the CMG storage. + ClusterState clusterState = clusterStateStorageManager.getClusterState(); + assert clusterState != null : "clusterState cannot be null when commands are already being executed by the CMG state machine"; - assert clusterId != null : "clusterId cannot be null when commands are already being executed by the CMG state machine"; + // clusterId cannot have different non-null values for the same node during the same launch, so we don't need to synchronize. + localClusterId = clusterState.clusterTag().clusterId(); + clusterId = localClusterId; - return clusterId; + return localClusterId; } private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) { diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java index 6f7131ed05..aaf888b276 100644 --- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java +++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java @@ -43,6 +43,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.function.LongConsumer; +import org.apache.ignite.internal.cluster.management.ClusterIdHolder; import org.apache.ignite.internal.cluster.management.ClusterState; import org.apache.ignite.internal.cluster.management.ClusterTag; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; @@ -52,7 +53,6 @@ import org.apache.ignite.internal.cluster.management.topology.LogicalTopology; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.manager.ComponentContext; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.raft.Command; import org.apache.ignite.internal.raft.service.CommandClosure; @@ -77,7 +77,7 @@ public class CmgRaftGroupListenerTest extends BaseIgniteAbstractTest { private LongConsumer onLogicalTopologyChanged; @Spy - private final LogicalTopology logicalTopology = new LogicalTopologyImpl(storage, new ConstantClusterIdSupplier()); + private final LogicalTopology logicalTopology = new LogicalTopologyImpl(storage); private CmgRaftGroupListener listener; @@ -96,6 +96,8 @@ public class CmgRaftGroupListenerTest extends BaseIgniteAbstractTest { private final ClusterNodeMessage node = msgFactory.clusterNodeMessage().id("foo").name("bar").host("localhost").port(666).build(); + private final ClusterIdHolder clusterIdHolder = new ClusterIdHolder(); + @BeforeEach void setUp() { assertThat(storage.startAsync(new ComponentContext()), willCompleteSuccessfully()); @@ -103,7 +105,13 @@ public class CmgRaftGroupListenerTest extends BaseIgniteAbstractTest { var clusterStateStorageMgr = new ClusterStateStorageManager(storage); var validationManager = new ValidationManager(clusterStateStorageMgr, logicalTopology); - listener = new CmgRaftGroupListener(clusterStateStorageMgr, logicalTopology, validationManager, onLogicalTopologyChanged); + listener = new CmgRaftGroupListener( + clusterStateStorageMgr, + logicalTopology, + validationManager, + onLogicalTopologyChanged, + clusterIdHolder + ); } @AfterEach @@ -231,6 +239,18 @@ public class CmgRaftGroupListenerTest extends BaseIgniteAbstractTest { )); } + @Test + void initStoresClusterId() { + listener.onWrite(iterator( + msgFactory.initCmgStateCommand() + .clusterState(state) + .node(node) + .build() + )); + + assertThat(clusterIdHolder.clusterId(), is(state.clusterTag().clusterId())); + } + private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) { CommandClosure<T> closure = new CommandClosure<>() { @Override diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java index 733692c82d..51cb10363f 100644 --- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java +++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot; import org.apache.ignite.internal.manager.ComponentContext; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; @@ -98,7 +97,7 @@ class LogicalTopologyImplTest extends BaseIgniteAbstractTest { void setUp() { assertThat(storage.startAsync(new ComponentContext()), willCompleteSuccessfully()); - topology = new LogicalTopologyImpl(storage, new ConstantClusterIdSupplier()); + topology = new LogicalTopologyImpl(storage); topology.addEventListener(listener); } diff --git a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java index 6c1143fa9a..96aa9d4c61 100644 --- a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java +++ b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.ComponentContext; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.NodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; @@ -133,7 +132,7 @@ public class MockNode { new ClusterInitializer(clusterService, hocon -> hocon, new TestConfigurationValidator()), raftManager, clusterStateStorage, - new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()), + new LogicalTopologyImpl(clusterStateStorage), new NodeAttributesCollector(nodeAttributes, storageProfilesConfiguration), failureManager, clusterIdHolder, diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java index 5db25e9a42..ac92c5245d 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -230,7 +230,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe mock(FailureManager.class) ); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, clusterIdService); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var cmgManager = mock(ClusterManagementGroupManager.class); diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java index 2bb6740f48..eaade82106 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java @@ -32,6 +32,8 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -42,7 +44,10 @@ import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogTestUtils; import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; +import org.apache.ignite.internal.cluster.management.ClusterTag; +import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; +import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopology; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; @@ -55,7 +60,7 @@ import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; +import org.apache.ignite.internal.properties.IgniteProductVersion; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.jetbrains.annotations.Nullable; @@ -88,6 +93,8 @@ public abstract class BaseDistributionZoneManagerTest extends BaseIgniteAbstract private final List<IgniteComponent> components = new ArrayList<>(); + private final CmgMessagesFactory cmgMessagesFactory = new CmgMessagesFactory(); + @BeforeEach void setUp() throws Exception { String nodeName = "test"; @@ -99,10 +106,18 @@ public abstract class BaseDistributionZoneManagerTest extends BaseIgniteAbstract components.add(metaStorageManager); clusterStateStorage = new TestClusterStateStorage(); + new ClusterStateStorageManager(clusterStateStorage).putClusterState( + cmgMessagesFactory.clusterState() + .cmgNodes(Set.of("test")) + .metaStorageNodes(Set.of("test")) + .clusterTag(ClusterTag.clusterTag(cmgMessagesFactory, "cluster", new UUID(1, 1))) + .version(IgniteProductVersion.CURRENT_VERSION.toString()) + .build() + ); components.add(clusterStateStorage); - topology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + topology = new LogicalTopologyImpl(clusterStateStorage); ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index f1c29a9a5d..d1c3cd1fd6 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -57,7 +57,6 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.Loza; @@ -156,7 +155,7 @@ abstract class ItMetaStorageMultipleNodesAbstractTest extends IgniteAbstractTest raftGroupEventsClientListener ); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService, diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index ea965caf33..a1ca164e3d 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -81,7 +81,6 @@ import org.apache.ignite.internal.metastorage.dsl.Operations; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.Loza; @@ -165,7 +164,7 @@ public class ItMetaStorageWatchTest extends IgniteAbstractTest { components.add(clusterStateStorage); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService, diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java index 39a03f2ccd..7796003fd5 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java @@ -132,7 +132,6 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage; import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.configuration.NetworkExtensionConfigurationSchema; import org.apache.ignite.internal.network.recovery.InMemoryStaleIds; @@ -988,7 +987,7 @@ public class ItReplicaLifecycleTest extends BaseIgniteAbstractTest { ); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 09d587267b..bc2f831921 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -66,7 +66,6 @@ import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.Loza; @@ -185,7 +184,7 @@ public class ItDistributedConfigurationPropertiesTest extends BaseIgniteAbstract ); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index b37fd69385..6cde9d2c4d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -58,7 +58,6 @@ import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl; import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage; import org.apache.ignite.internal.metrics.NoOpMetricManager; import org.apache.ignite.internal.network.ClusterService; -import org.apache.ignite.internal.network.ConstantClusterIdSupplier; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.Loza; @@ -158,7 +157,7 @@ public class ItDistributedConfigurationStorageTest extends BaseIgniteAbstractTes ); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, new ConstantClusterIdSupplier()); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 14c44d71d4..3ac74b9fd8 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -409,7 +409,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { raftGroupEventsClientListener ); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, clusterIdService); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterSvc, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 23b8da3dcc..b975871149 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -598,7 +598,7 @@ public class IgniteImpl implements Ignite { RaftGroupOptionsConfigurer cmgRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(cmgLogStorageFactory, cmgWorkDir.metaPath()); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, clusterIdService); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); ConfigurationTreeGenerator distributedConfigurationGenerator = new ConfigurationTreeGenerator( modules.distributed().rootKeys(), @@ -1072,7 +1072,7 @@ public class IgniteImpl implements Ignite { compute, clusterSvc, nettyBootstrapFactory, - () -> clusterInfo(systemDisasterRecoveryStorage), + () -> clusterInfo(clusterStateStorageMgr), metricManager, new ClientHandlerMetricSource(), authenticationManager, @@ -1095,8 +1095,11 @@ public class IgniteImpl implements Ignite { publicCatalog = new PublicApiThreadingIgniteCatalog(new IgniteCatalogSqlImpl(sql, distributedTblMgr), asyncContinuationExecutor); } - private static ClusterInfo clusterInfo(SystemDisasterRecoveryStorage systemDisasterRecoveryStorage) { - ClusterState clusterState = systemDisasterRecoveryStorage.readClusterState(); + private static ClusterInfo clusterInfo(ClusterStateStorageManager clusterStateStorageManager) { + // It is safe to read cluster state from CMG state as it can only be read when the node is initialized and fully started, + // and in those moments the cluster state is already available in the CMG state storage. + + ClusterState clusterState = clusterStateStorageManager.getClusterState(); assert clusterState != null : "Cluster state cannot be null at the moment when a client connects"; diff --git a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java index 0887aa3764..731827fc04 100644 --- a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java +++ b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/ClusterIdService.java @@ -47,6 +47,9 @@ public class ClusterIdService implements ClusterIdSupplier, ClusterIdStore, Igni @Override public CompletableFuture<Void> startAsync(ComponentContext componentContext) { + // Reading from the SystemDisasterRecoveryStorage and not from ClusterStateStorage (used by the CMG) because the latter is recreated + // on each start and there could be moments during start when an initialized node does not have cluster state in + // the ClusterStateStorage. ClusterState clusterState = storage.readClusterState(); if (clusterState != null) { clusterId(clusterState.clusterTag().clusterId()); diff --git a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java index 1462bbe296..d7b80b04d3 100644 --- a/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java +++ b/modules/system-disaster-recovery/src/main/java/org/apache/ignite/internal/disaster/system/SystemDisasterRecoveryStorage.java @@ -53,7 +53,10 @@ public class SystemDisasterRecoveryStorage implements ClusterResetStorage { vault.remove(RESET_CLUSTER_MESSAGE_VAULT_KEY); } - /** Reads cluster state. */ + /** + * Reads cluster state. This is used for cases when it may be needed to read it during node startup (and the usual CMG state storage + * might be empty at those moments). + */ public @Nullable ClusterState readClusterState() { return readFromVault(CLUSTER_STATE_VAULT_KEY); } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 9faadad4e0..55b617ff3a 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1175,7 +1175,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { )); var clusterStateStorage = new TestClusterStateStorage(); - var logicalTopology = new LogicalTopologyImpl(clusterStateStorage, clusterIdService); + var logicalTopology = new LogicalTopologyImpl(clusterStateStorage); var clusterInitializer = new ClusterInitializer( clusterService,
