This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bab4e35a89 IGNITE-19464 Move out logic of applying cluster
configuration from ClusterManagementGroupManager (#2066)
bab4e35a89 is described below
commit bab4e35a89bb117a3f599a0126fc179a37a84a0c
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Thu May 18 19:43:38 2023 +0400
IGNITE-19464 Move out logic of applying cluster configuration from
ClusterManagementGroupManager (#2066)
---
modules/cluster-management/build.gradle | 2 +
.../cluster/management/ItClusterManagerTest.java | 91 ++++++++++++----
.../management/ClusterManagementGroupManager.java | 82 ++++++++-------
.../UpdateDistributedConfigurationAction.java | 66 ++++++++++++
.../internal/cluster/management/MockNode.java | 4 -
.../impl/ItMetaStorageMultipleNodesTest.java | 12 +--
.../metastorage/impl/ItMetaStorageWatchTest.java | 8 --
.../ItDistributedConfigurationPropertiesTest.java | 11 +-
.../ItDistributedConfigurationStorageTest.java | 10 +-
.../storage/ItRebalanceDistributedTest.java | 9 +-
.../runner/app/ItIgniteNodeRestartTest.java | 5 -
.../org/apache/ignite/internal/app/IgniteImpl.java | 18 ++--
.../DistributedConfigurationUpdater.java | 47 ++++-----
.../DistributedConfigurationUpdaterTest.java | 115 +++++++++++++++++++++
14 files changed, 338 insertions(+), 142 deletions(-)
diff --git a/modules/cluster-management/build.gradle
b/modules/cluster-management/build.gradle
index 6d1773910e..010cf2d9b4 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -69,4 +69,6 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-network'))
+ integrationTestImplementation libs.awaitility
+ integrationTestImplementation libs.jetbrains.annotations
}
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 2128d516c3..e57ce8eb8f 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.cluster.management;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -30,7 +32,9 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -42,6 +46,8 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -334,16 +340,7 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
assertThat(node.clusterManager().joinFuture(),
willCompleteSuccessfully());
// Find the CMG leader and stop it
- MockNode leaderNode = cluster.stream()
- .filter(n -> {
- CompletableFuture<Boolean> isLeader =
n.clusterManager().isCmgLeader();
-
- assertThat(isLeader, willCompleteSuccessfully());
-
- return isLeader.join();
- })
- .findAny()
- .orElseThrow();
+ MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
stopNodes(List.of(leaderNode));
@@ -351,6 +348,45 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
assertThat(node.clusterManager().onJoinReady(),
willCompleteSuccessfully());
}
+ @Test
+ void
testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo
testInfo) throws Exception {
+ // Start a cluster of 3 nodes so that the CMG leader node could be
stopped later.
+ startCluster(3, testInfo);
+
+ String[] cmgNodes = clusterNodeNames();
+
+ // Start the CMG on all 3 nodes.
+ String clusterConfiguration = "security.authentication.enabled:true";
+ initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+ // Find the CMG leader and stop it.
+ MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+ // Read cluster configuration from the cluster state and remove it.
+ UpdateDistributedConfigurationAction leaderAction =
leaderNode.clusterManager()
+ .clusterConfigurationToUpdate()
+ .get();
+
+ // Check the leader has configuration.
+ assertEquals(clusterConfiguration, leaderAction.configuration());
+
+ // Execute the next action (remove the configuration from the cluster
state)
+ assertThat(leaderAction.nextAction().get(),
willCompleteSuccessfully());
+
+ // Stop the cluster leader to check the new leader is not going to
update the configuration.
+ stopNodes(List.of(leaderNode));
+ cluster.remove(leaderNode);
+
+ // Wait for a new leader to be elected.
+ Awaitility.await().until(() -> findLeaderNode(cluster).isPresent());
+
+ // Find the new CMG leader.
+ MockNode newLeaderNode = findLeaderNode(cluster).orElseThrow();
+
+ // Check the new leader cancels the action.
+
assertThat(newLeaderNode.clusterManager().clusterConfigurationToUpdate(),
willThrow(CancellationException.class));
+ }
+
@Test
void testLeaderChangeBeforeJoin(TestInfo testInfo) throws Exception {
// Start a cluster of 3 nodes so that the CMG leader node could be
stopped later.
@@ -362,16 +398,7 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
initCluster(cmgNodes, cmgNodes);
// Find the CMG leader and stop it
- MockNode leaderNode = cluster.stream()
- .filter(n -> {
- CompletableFuture<Boolean> isLeader =
n.clusterManager().isCmgLeader();
-
- assertThat(isLeader, willCompleteSuccessfully());
-
- return isLeader.join();
- })
- .findAny()
- .orElseThrow();
+ MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
stopNodes(List.of(leaderNode));
@@ -414,6 +441,18 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
assertTrue(waitForCondition(() ->
nonCmgTopology.getLogicalTopology().nodes().size() == 2, 10_000));
}
+ private Optional<MockNode> findLeaderNode(List<MockNode> cluster) {
+ return cluster.stream()
+ .filter(n -> {
+ CompletableFuture<Boolean> isLeader =
n.clusterManager().isCmgLeader();
+
+ assertThat(isLeader, willCompleteSuccessfully());
+
+ return isLeader.join();
+ })
+ .findAny();
+ }
+
private List<ClusterNode> currentPhysicalTopology() {
return cluster.stream()
.map(MockNode::localMember)
@@ -440,11 +479,21 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
}, 10000));
}
+
private void initCluster(String[] metaStorageNodes, String[] cmgNodes)
throws NodeStoppingException {
+ initCluster(metaStorageNodes, cmgNodes, null);
+ }
+
+ private void initCluster(
+ String[] metaStorageNodes,
+ String[] cmgNodes,
+ @Nullable String clusterConfiguration
+ ) throws NodeStoppingException {
cluster.get(0).clusterManager().initCluster(
Arrays.asList(metaStorageNodes),
Arrays.asList(cmgNodes),
- "cluster"
+ "cluster",
+ clusterConfiguration
);
for (MockNode node : cluster) {
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index b59c518be2..25374c2d95 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -107,6 +107,10 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
*/
private final CompletableFuture<Void> joinFuture = new
CompletableFuture<>();
+ // TODO: IGNITE-19489 Cancel updateDistributedConfigurationActionFuture if
the configuration is applied
+ private final CompletableFuture<UpdateDistributedConfigurationAction>
updateDistributedConfigurationActionFuture =
+ new CompletableFuture<>();
+
/** Message factory. */
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
@@ -130,8 +134,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
/** Handles cluster initialization flow. */
private final ClusterInitializer clusterInitializer;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
/** Node's attributes configuration. */
private final NodeAttributesConfiguration nodeAttributes;
@@ -143,7 +145,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
ClusterStateStorage clusterStateStorage,
LogicalTopology logicalTopology,
ClusterManagementConfiguration configuration,
- DistributedConfigurationUpdater distributedConfigurationUpdater,
NodeAttributesConfiguration nodeAttributes
) {
this.clusterService = clusterService;
@@ -151,8 +152,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
this.clusterStateStorage = clusterStateStorage;
this.logicalTopology = logicalTopology;
this.configuration = configuration;
- this.distributedConfigurationUpdater = distributedConfigurationUpdater;
-
this.localStateStorage = new LocalStateStorage(vault);
this.clusterInitializer = new ClusterInitializer(clusterService);
this.nodeAttributes = nodeAttributes;
@@ -389,45 +388,47 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
}
});
- raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
- }
-
- private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService
service) {
- return service.readClusterState()
- .thenCompose(state -> {
- if (state == null) {
- LOG.info("No CMG state found in the Raft service");
- return completedFuture(null);
- } else if (state.clusterConfigurationToApply() == null) {
- // Config was applied or wasn't provided
- LOG.info("No cluster configuration found in the Raft
service");
- return completedFuture(null);
+ raftServiceAfterJoin().thenCompose(service ->
service.readClusterState()
+ .whenComplete((state, e) -> {
+ if (e != null) {
+ LOG.error("Error when retrieving cluster
configuration", e);
+
updateDistributedConfigurationActionFuture.completeExceptionally(e);
} else {
- LOG.info("Cluster configuration is found in the Raft
service, going to apply it");
- return
distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
- .thenCompose(unused ->
removeClusterConfigFromClusterState(service));
+ String configuration =
state.clusterConfigurationToApply();
+ if (configuration != null) {
+
updateDistributedConfigurationActionFuture.complete(
+ new UpdateDistributedConfigurationAction(
+ configuration,
+ () ->
removeClusterConfigFromClusterState(service)
+ ));
+ } else {
+
updateDistributedConfigurationActionFuture.cancel(true);
+ }
}
- });
+ })
+ );
}
private CompletableFuture<Void>
removeClusterConfigFromClusterState(CmgRaftService service) {
return service.readClusterState()
.thenCompose(state -> {
- Collection<String> cmgNodes = state.cmgNodes();
- Collection<String> msNodes = state.metaStorageNodes();
- IgniteProductVersion igniteVersion = state.igniteVersion();
- ClusterTag clusterTag = state.clusterTag();
- ClusterState clusterState = msgFactory.clusterState()
- .cmgNodes(Set.copyOf(cmgNodes))
- .metaStorageNodes(Set.copyOf(msNodes))
- .version(igniteVersion.toString())
- .clusterTag(clusterTag)
- .build();
- return service.updateClusterState(clusterState);
- })
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.warn("Error when removing cluster configuration",
e);
+ if (state.clusterConfigurationToApply() != null) {
+ ClusterState clusterState = msgFactory.clusterState()
+ .cmgNodes(Set.copyOf(state.cmgNodes()))
+
.metaStorageNodes(Set.copyOf(state.metaStorageNodes()))
+ .version(state.igniteVersion().toString())
+ .clusterTag(state.clusterTag())
+ .build();
+ return service.updateClusterState(clusterState)
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Error when removing
configuration from cluster state", e);
+ } else {
+ LOG.info("Cluster configuration is
removed from cluster state");
+ }
+ });
+ } else {
+ return completedFuture(null);
}
});
}
@@ -718,8 +719,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
- // Fail the future to unblock dependent operations
+ // Fail the futures to unblock dependent operations
joinFuture.completeExceptionally(new NodeStoppingException());
+ updateDistributedConfigurationActionFuture.completeExceptionally(new
NodeStoppingException());
}
/**
@@ -828,6 +830,10 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
}
}
+ public CompletableFuture<UpdateDistributedConfigurationAction>
clusterConfigurationToUpdate() {
+ return updateDistributedConfigurationActionFuture;
+ }
+
/**
* Returns a future that resolves to {@code true} if the current node is
the CMG leader.
*/
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
new file mode 100644
index 0000000000..9503616e1f
--- /dev/null
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+ /**
+ * Configuration that should be applied.
+ */
+ private final String configuration;
+
+ /**
+ * The next action to execute.
+ */
+ private final Supplier<CompletableFuture<Void>> nextAction;
+
+ /**
+ * Constructor.
+ *
+ * @param configuration the configuration.
+ * @param nextAction the next action.
+ */
+ public UpdateDistributedConfigurationAction(String configuration,
Supplier<CompletableFuture<Void>> nextAction) {
+ this.configuration = configuration;
+ this.nextAction = nextAction;
+ }
+
+ /**
+ * Returns the configuration.
+ *
+ * @return the configuration.
+ */
+ public String configuration() {
+ return configuration;
+ }
+
+ /**
+ * Returns the next action to execute.
+ *
+ * @return the next action.
+ */
+ public Supplier<CompletableFuture<Void>> nextAction() {
+ return nextAction;
+ }
+}
diff --git
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 1808ca1e1f..e3a6dded4d 100644
---
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -110,8 +110,6 @@ public class MockNode {
var logicalTopologyService = new
LogicalTopologyImpl(clusterStateStorage);
- var distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
this.clusterManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -119,7 +117,6 @@ public class MockNode {
clusterStateStorage,
logicalTopologyService,
cmgConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -127,7 +124,6 @@ public class MockNode {
components.add(clusterService);
components.add(raftManager);
components.add(clusterStateStorage);
- components.add(distributedConfigurationUpdater);
components.add(clusterManager);
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 045cb88766..a817026ba3 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
@@ -113,8 +112,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
private final MetaStorageManagerImpl metaStorageManager;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
Node(ClusterService clusterService, Path dataPath) {
this.clusterService = clusterService;
@@ -132,8 +129,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -141,7 +136,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
clusterStateStorage,
logicalTopology,
cmgConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -163,8 +157,7 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
raftManager,
clusterStateStorage,
cmgManager,
- metaStorageManager,
- distributedConfigurationUpdater
+ metaStorageManager
);
components.forEach(IgniteComponent::start);
@@ -183,8 +176,7 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
raftManager,
clusterStateStorage,
clusterService,
- vaultManager,
- distributedConfigurationUpdater
+ vaultManager
);
Stream<AutoCloseable> beforeNodeStop = components.stream().map(c
-> c::beforeNodeStop);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 9c1aa8696d..2523690d82 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -38,7 +38,6 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -91,8 +90,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
private final ClusterManagementGroupManager cmgManager;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
Node(ClusterService clusterService, Path dataPath) {
var vaultManager = new VaultManager(new InMemoryVaultService());
@@ -120,10 +117,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
- components.add(distributedConfigurationUpdater);
-
this.cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -131,7 +124,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
clusterStateStorage,
logicalTopology,
cmgConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 44321fddf9..cc39c3ec57 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -37,7 +37,6 @@ import
org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -113,8 +112,6 @@ public class ItDistributedConfigurationPropertiesTest {
private final ConfigurationManager distributedCfgManager;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
/** Flag that disables storage updates. */
private volatile boolean receivesUpdates = true;
@@ -142,8 +139,6 @@ public class ItDistributedConfigurationPropertiesTest {
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -151,7 +146,6 @@ public class ItDistributedConfigurationPropertiesTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -207,7 +201,7 @@ public class ItDistributedConfigurationPropertiesTest {
void start() {
vaultManager.start();
- Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager, distributedConfigurationUpdater)
+ Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager)
.forEach(IgniteComponent::start);
// deploy watches to propagate data from the metastore into the
vault
@@ -230,8 +224,7 @@ public class ItDistributedConfigurationPropertiesTest {
metaStorageManager,
raftManager,
clusterService,
- vaultManager,
- distributedConfigurationUpdater
+ vaultManager
);
for (IgniteComponent igniteComponent : components) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index fa398ab430..e1ca5afc7c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -92,8 +91,6 @@ public class ItDistributedConfigurationStorageTest {
private final DistributedConfigurationStorage cfgStorage;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -115,8 +112,6 @@ public class ItDistributedConfigurationStorageTest {
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -124,7 +119,6 @@ public class ItDistributedConfigurationStorageTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -147,7 +141,7 @@ public class ItDistributedConfigurationStorageTest {
void start() throws Exception {
vaultManager.start();
- Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager, distributedConfigurationUpdater)
+ Stream.of(clusterService, raftManager, cmgManager,
metaStorageManager)
.forEach(IgniteComponent::start);
// this is needed to avoid assertion errors
@@ -172,7 +166,7 @@ public class ItDistributedConfigurationStorageTest {
*/
void stop() throws Exception {
var components =
- List.of(metaStorageManager, cmgManager, raftManager,
clusterService, vaultManager, distributedConfigurationUpdater);
+ List.of(metaStorageManager, cmgManager, raftManager,
clusterService, vaultManager);
for (IgniteComponent igniteComponent : components) {
igniteComponent.beforeNodeStop();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 32ec34b698..371362c38d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -589,8 +588,6 @@ public class ItRebalanceDistributedTest {
private final CatalogManager catalogManager;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
private List<IgniteComponent> nodeComponents;
private final ConfigurationTreeGenerator generator;
@@ -639,8 +636,6 @@ public class ItRebalanceDistributedTest {
var clusterStateStorage = new TestClusterStateStorage();
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
@@ -648,7 +643,6 @@ public class ItRebalanceDistributedTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -835,8 +829,7 @@ public class ItRebalanceDistributedTest {
baselineMgr,
dataStorageMgr,
schemaManager,
- tableManager,
- distributedConfigurationUpdater
+ tableManager
);
nodeComponents.forEach(IgniteComponent::start);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4a35db44ed..a21247c5ac 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
@@ -292,8 +291,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- var distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
var cmgManager = new ClusterManagementGroupManager(
vault,
clusterSvc,
@@ -301,7 +298,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
clusterStateStorage,
logicalTopology,
clusterManagementConfiguration,
- distributedConfigurationUpdater,
nodeAttributes
);
@@ -449,7 +445,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
replicaMgr,
txManager,
metaStorageMgr,
- distributedConfigurationUpdater,
clusterCfgMgr,
dataStorageManager,
catalogManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3948610af4..ad1acdee1a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
@@ -63,6 +62,7 @@ import
org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import
org.apache.ignite.internal.configuration.DistributedConfigurationUpdater;
import org.apache.ignite.internal.configuration.SecurityConfiguration;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
import org.apache.ignite.internal.configuration.presentation.HoconPresentation;
@@ -364,8 +364,6 @@ public class IgniteImpl implements Ignite {
var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
- distributedConfigurationUpdater = new
DistributedConfigurationUpdater();
-
cmgMgr = new ClusterManagementGroupManager(
vaultMgr,
clusterSvc,
@@ -373,7 +371,6 @@ public class IgniteImpl implements Ignite {
clusterStateStorage,
logicalTopology,
nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY),
- distributedConfigurationUpdater,
nodeConfigRegistry.getConfiguration(NodeAttributesConfiguration.KEY)
);
@@ -406,6 +403,12 @@ public class IgniteImpl implements Ignite {
modules.distributed().polymorphicSchemaExtensions()
);
+
+ distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+ cmgMgr,
+ new HoconPresentation(clusterCfgMgr.configurationRegistry())
+ );
+
ConfigurationRegistry clusterConfigRegistry =
clusterCfgMgr.configurationRegistry();
TablesConfiguration tablesConfiguration =
clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
@@ -725,8 +728,11 @@ public class IgniteImpl implements Ignite {
}, startupExecutor);
}, startupExecutor)
.thenRunAsync(() -> {
- HoconPresentation presentation = new
HoconPresentation(clusterCfgMgr.configurationRegistry());
-
distributedConfigurationUpdater.setDistributedConfigurationPresentation(presentation);
+ try {
+
lifecycleManager.startComponent(distributedConfigurationUpdater);
+ } catch (NodeStoppingException e) {
+ throw new CompletionException(e);
+ }
}, startupExecutor)
// Signal that local recovery is complete and the node is
ready to join the cluster.
.thenComposeAsync(v -> {
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
similarity index 52%
rename from
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
rename to
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
index 3dd0c9871c..6a6447fab5 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cluster.management;
+package org.apache.ignite.internal.configuration;
-import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.lang.NodeStoppingException;
/**
* Updater is responsible for applying changes to the cluster configuration
when it's ready.
@@ -31,37 +30,35 @@ public class DistributedConfigurationUpdater implements
IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(DistributedConfigurationUpdater.class);
- private final CompletableFuture<ConfigurationPresentation<String>>
clusterCfgPresentation = new CompletableFuture<>();
+ private final ClusterManagementGroupManager cmgMgr;
- public void
setDistributedConfigurationPresentation(ConfigurationPresentation<String>
presentation) {
- clusterCfgPresentation.complete(presentation);
- }
+ private final ConfigurationPresentation<String> presentation;
- /**
- * Applies changes to the cluster configuration when {@link
DistributedConfigurationUpdater#clusterCfgPresentation}
- * is complete.
- *
- * @param configurationToApply Cluster configuration that should be
applied.
- * @return Future that will be completed when cluster configuration is
updated.
- */
- public CompletableFuture<Void> updateConfiguration(String
configurationToApply) {
- return clusterCfgPresentation.thenCompose(presentation ->
presentation.update(configurationToApply))
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Unable to update cluster configuration", e);
- } else {
- LOG.info("Cluster configuration updated successfully");
- }
- });
+ public DistributedConfigurationUpdater(ClusterManagementGroupManager
cmgMgr, ConfigurationPresentation<String> presentation) {
+ this.cmgMgr = cmgMgr;
+ this.presentation = presentation;
}
@Override
public void start() {
-
+ cmgMgr.clusterConfigurationToUpdate()
+ .thenAccept(action -> {
+ if (action.configuration() != null) {
+ presentation.update(action.configuration())
+ .thenApply(ignored -> action)
+ .thenCompose(it -> it.nextAction().get())
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failed to update the
distributed configuration", e);
+ } else {
+ LOG.info("Distributed configuration is
updated");
+ }
+ });
+ }
+ });
}
@Override
public void stop() throws Exception {
- clusterCfgPresentation.completeExceptionally(new
NodeStoppingException("Component is stopped."));
}
}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
new file mode 100644
index 0000000000..79542d897c
--- /dev/null
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import
org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+ @Mock
+ public ConfigurationPresentation<String> presentation;
+
+ @Mock
+ public ClusterManagementGroupManager cmgMgr;
+
+ @Test
+ public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+ // Set up mocks.
+
when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+ CompletableFuture<Void> nextAction = new CompletableFuture<>();
+ String configuration = "security.authentication.enabled:true";
+ UpdateDistributedConfigurationAction
updateDistributedConfigurationAction =
+ new UpdateDistributedConfigurationAction(
+ configuration,
+ () -> {
+ nextAction.complete(null);
+ return nextAction;
+ }
+ );
+
+ when(cmgMgr.clusterConfigurationToUpdate())
+
.thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+ // Run updater.
+ DistributedConfigurationUpdater distributedConfigurationUpdater = new
DistributedConfigurationUpdater(
+ cmgMgr,
+ presentation
+ );
+
+ distributedConfigurationUpdater.start();
+
+ // Verify that configuration was updated.
+ verify(presentation, times(1)).update(configuration);
+
+ // Verify that next action is completed.
+ assertThat(nextAction, willCompleteSuccessfully());
+ }
+
+ @Test
+ public void nextActionIsCompletedIfConfigurationNull() {
+
+ // Set up mocks.
+ CompletableFuture<Void> nextAction = new CompletableFuture<>();
+ UpdateDistributedConfigurationAction
updateDistributedConfigurationAction =
+ new UpdateDistributedConfigurationAction(
+ null,
+ () -> {
+ nextAction.complete(null);
+ return nextAction;
+ }
+ );
+
+ when(cmgMgr.clusterConfigurationToUpdate())
+
.thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+ // Run updater.
+ DistributedConfigurationUpdater distributedConfigurationUpdater = new
DistributedConfigurationUpdater(
+ cmgMgr,
+ presentation
+ );
+
+ distributedConfigurationUpdater.start();
+
+ // Verify that configuration wasn't updated.
+ verify(presentation, never()).update(any());
+
+ // Verify that next action is not completed.
+ assertThat(nextAction, willTimeoutFast());
+ }
+}