This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bab4e35a89 IGNITE-19464 Move out logic of applying cluster 
configuration from ClusterManagementGroupManager (#2066)
bab4e35a89 is described below

commit bab4e35a89bb117a3f599a0126fc179a37a84a0c
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Thu May 18 19:43:38 2023 +0400

    IGNITE-19464 Move out logic of applying cluster configuration from 
ClusterManagementGroupManager (#2066)
---
 modules/cluster-management/build.gradle            |   2 +
 .../cluster/management/ItClusterManagerTest.java   |  91 ++++++++++++----
 .../management/ClusterManagementGroupManager.java  |  82 ++++++++-------
 .../UpdateDistributedConfigurationAction.java      |  66 ++++++++++++
 .../internal/cluster/management/MockNode.java      |   4 -
 .../impl/ItMetaStorageMultipleNodesTest.java       |  12 +--
 .../metastorage/impl/ItMetaStorageWatchTest.java   |   8 --
 .../ItDistributedConfigurationPropertiesTest.java  |  11 +-
 .../ItDistributedConfigurationStorageTest.java     |  10 +-
 .../storage/ItRebalanceDistributedTest.java        |   9 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   5 -
 .../org/apache/ignite/internal/app/IgniteImpl.java |  18 ++--
 .../DistributedConfigurationUpdater.java           |  47 ++++-----
 .../DistributedConfigurationUpdaterTest.java       | 115 +++++++++++++++++++++
 14 files changed, 338 insertions(+), 142 deletions(-)

diff --git a/modules/cluster-management/build.gradle 
b/modules/cluster-management/build.gradle
index 6d1773910e..010cf2d9b4 100644
--- a/modules/cluster-management/build.gradle
+++ b/modules/cluster-management/build.gradle
@@ -69,4 +69,6 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
     integrationTestImplementation testFixtures(project(':ignite-network'))
+    integrationTestImplementation libs.awaitility
+    integrationTestImplementation libs.jetbrains.annotations
 }
diff --git 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index 2128d516c3..e57ce8eb8f 100644
--- 
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ 
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -19,10 +19,12 @@ package org.apache.ignite.internal.cluster.management;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -30,7 +32,9 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -42,6 +46,8 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
+import org.awaitility.Awaitility;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -334,16 +340,7 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         assertThat(node.clusterManager().joinFuture(), 
willCompleteSuccessfully());
 
         // Find the CMG leader and stop it
-        MockNode leaderNode = cluster.stream()
-                .filter(n -> {
-                    CompletableFuture<Boolean> isLeader = 
n.clusterManager().isCmgLeader();
-
-                    assertThat(isLeader, willCompleteSuccessfully());
-
-                    return isLeader.join();
-                })
-                .findAny()
-                .orElseThrow();
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
 
         stopNodes(List.of(leaderNode));
 
@@ -351,6 +348,45 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         assertThat(node.clusterManager().onJoinReady(), 
willCompleteSuccessfully());
     }
 
+    @Test
+    void 
testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo 
testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be 
stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction leaderAction = 
leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        // Check the leader has configuration.
+        assertEquals(clusterConfiguration, leaderAction.configuration());
+
+        // Execute the next action (remove the configuration from the cluster 
state)
+        assertThat(leaderAction.nextAction().get(), 
willCompleteSuccessfully());
+
+        // Stop the cluster leader to check the new leader is not going to 
update the configuration.
+        stopNodes(List.of(leaderNode));
+        cluster.remove(leaderNode);
+
+        // Wait for a new leader to be elected.
+        Awaitility.await().until(() -> findLeaderNode(cluster).isPresent());
+
+        // Find the new CMG leader.
+        MockNode newLeaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Check the new leader cancels the action.
+        
assertThat(newLeaderNode.clusterManager().clusterConfigurationToUpdate(), 
willThrow(CancellationException.class));
+    }
+
     @Test
     void testLeaderChangeBeforeJoin(TestInfo testInfo) throws Exception {
         // Start a cluster of 3 nodes so that the CMG leader node could be 
stopped later.
@@ -362,16 +398,7 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         initCluster(cmgNodes, cmgNodes);
 
         // Find the CMG leader and stop it
-        MockNode leaderNode = cluster.stream()
-                .filter(n -> {
-                    CompletableFuture<Boolean> isLeader = 
n.clusterManager().isCmgLeader();
-
-                    assertThat(isLeader, willCompleteSuccessfully());
-
-                    return isLeader.join();
-                })
-                .findAny()
-                .orElseThrow();
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
 
         stopNodes(List.of(leaderNode));
 
@@ -414,6 +441,18 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         assertTrue(waitForCondition(() -> 
nonCmgTopology.getLogicalTopology().nodes().size() == 2, 10_000));
     }
 
+    private Optional<MockNode> findLeaderNode(List<MockNode> cluster) {
+        return cluster.stream()
+                .filter(n -> {
+                    CompletableFuture<Boolean> isLeader = 
n.clusterManager().isCmgLeader();
+
+                    assertThat(isLeader, willCompleteSuccessfully());
+
+                    return isLeader.join();
+                })
+                .findAny();
+    }
+
     private List<ClusterNode> currentPhysicalTopology() {
         return cluster.stream()
                 .map(MockNode::localMember)
@@ -440,11 +479,21 @@ public class ItClusterManagerTest extends 
BaseItClusterManagementTest {
         }, 10000));
     }
 
+
     private void initCluster(String[] metaStorageNodes, String[] cmgNodes) 
throws NodeStoppingException {
+        initCluster(metaStorageNodes, cmgNodes, null);
+    }
+
+    private void initCluster(
+            String[] metaStorageNodes,
+            String[] cmgNodes,
+            @Nullable String clusterConfiguration
+    ) throws NodeStoppingException {
         cluster.get(0).clusterManager().initCluster(
                 Arrays.asList(metaStorageNodes),
                 Arrays.asList(cmgNodes),
-                "cluster"
+                "cluster",
+                clusterConfiguration
         );
 
         for (MockNode node : cluster) {
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index b59c518be2..25374c2d95 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -107,6 +107,10 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
      */
     private final CompletableFuture<Void> joinFuture = new 
CompletableFuture<>();
 
+    // TODO: IGNITE-19489 Cancel updateDistributedConfigurationActionFuture if 
the configuration is applied
+    private final CompletableFuture<UpdateDistributedConfigurationAction> 
updateDistributedConfigurationActionFuture =
+            new CompletableFuture<>();
+
     /** Message factory. */
     private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
 
@@ -130,8 +134,6 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
     /** Handles cluster initialization flow. */
     private final ClusterInitializer clusterInitializer;
 
-    private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
     /** Node's attributes configuration. */
     private final NodeAttributesConfiguration nodeAttributes;
 
@@ -143,7 +145,6 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
             ClusterStateStorage clusterStateStorage,
             LogicalTopology logicalTopology,
             ClusterManagementConfiguration configuration,
-            DistributedConfigurationUpdater distributedConfigurationUpdater,
             NodeAttributesConfiguration nodeAttributes
     ) {
         this.clusterService = clusterService;
@@ -151,8 +152,6 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
         this.clusterStateStorage = clusterStateStorage;
         this.logicalTopology = logicalTopology;
         this.configuration = configuration;
-        this.distributedConfigurationUpdater = distributedConfigurationUpdater;
-
         this.localStateStorage = new LocalStateStorage(vault);
         this.clusterInitializer = new ClusterInitializer(clusterService);
         this.nodeAttributes = nodeAttributes;
@@ -389,45 +388,47 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
-    }
-
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService 
service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft 
service");
-                        return completedFuture(null);
+        raftServiceAfterJoin().thenCompose(service -> 
service.readClusterState()
+                .whenComplete((state, e) -> {
+                    if (e != null) {
+                        LOG.error("Error when retrieving cluster 
configuration", e);
+                        
updateDistributedConfigurationActionFuture.completeExceptionally(e);
                     } else {
-                        LOG.info("Cluster configuration is found in the Raft 
service, going to apply it");
-                        return 
distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
-                                .thenCompose(unused -> 
removeClusterConfigFromClusterState(service));
+                        String configuration = 
state.clusterConfigurationToApply();
+                        if (configuration != null) {
+                            
updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            configuration,
+                                            () -> 
removeClusterConfigFromClusterState(service)
+                                    ));
+                        } else {
+                            
updateDistributedConfigurationActionFuture.cancel(true);
+                        }
                     }
-                });
+                })
+        );
     }
 
     private CompletableFuture<Void> 
removeClusterConfigFromClusterState(CmgRaftService service) {
         return service.readClusterState()
                 .thenCompose(state -> {
-                    Collection<String> cmgNodes = state.cmgNodes();
-                    Collection<String> msNodes = state.metaStorageNodes();
-                    IgniteProductVersion igniteVersion = state.igniteVersion();
-                    ClusterTag clusterTag = state.clusterTag();
-                    ClusterState clusterState = msgFactory.clusterState()
-                            .cmgNodes(Set.copyOf(cmgNodes))
-                            .metaStorageNodes(Set.copyOf(msNodes))
-                            .version(igniteVersion.toString())
-                            .clusterTag(clusterTag)
-                            .build();
-                    return service.updateClusterState(clusterState);
-                })
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.warn("Error when removing cluster configuration", 
e);
+                    if (state.clusterConfigurationToApply() != null) {
+                        ClusterState clusterState = msgFactory.clusterState()
+                                .cmgNodes(Set.copyOf(state.cmgNodes()))
+                                
.metaStorageNodes(Set.copyOf(state.metaStorageNodes()))
+                                .version(state.igniteVersion().toString())
+                                .clusterTag(state.clusterTag())
+                                .build();
+                        return service.updateClusterState(clusterState)
+                                .whenComplete((v, e) -> {
+                                    if (e != null) {
+                                        LOG.error("Error when removing 
configuration from cluster state", e);
+                                    } else {
+                                        LOG.info("Cluster configuration is 
removed from cluster state");
+                                    }
+                                });
+                    } else {
+                        return completedFuture(null);
                     }
                 });
     }
@@ -718,8 +719,9 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
 
         raftManager.stopRaftNodes(CmgGroupId.INSTANCE);
 
-        // Fail the future to unblock dependent operations
+        // Fail the futures to unblock dependent operations
         joinFuture.completeExceptionally(new NodeStoppingException());
+        updateDistributedConfigurationActionFuture.completeExceptionally(new 
NodeStoppingException());
     }
 
     /**
@@ -828,6 +830,10 @@ public class ClusterManagementGroupManager implements 
IgniteComponent {
         }
     }
 
+    public CompletableFuture<UpdateDistributedConfigurationAction> 
clusterConfigurationToUpdate() {
+        return updateDistributedConfigurationActionFuture;
+    }
+
     /**
      * Returns a future that resolves to {@code true} if the current node is 
the CMG leader.
      */
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
new file mode 100644
index 0000000000..9503616e1f
--- /dev/null
+++ 
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+    /**
+     * Configuration that should be applied.
+     */
+    private final String configuration;
+
+    /**
+     * The next action to execute.
+     */
+    private final Supplier<CompletableFuture<Void>> nextAction;
+
+    /**
+     * Constructor.
+     *
+     * @param configuration the configuration.
+     * @param nextAction the next action.
+     */
+    public UpdateDistributedConfigurationAction(String configuration, 
Supplier<CompletableFuture<Void>> nextAction) {
+        this.configuration = configuration;
+        this.nextAction = nextAction;
+    }
+
+    /**
+     * Returns the configuration.
+     *
+     * @return the configuration.
+     */
+    public String configuration() {
+        return configuration;
+    }
+
+    /**
+     * Returns the next action to execute.
+     *
+     * @return the next action.
+     */
+    public Supplier<CompletableFuture<Void>> nextAction() {
+        return nextAction;
+    }
+}
diff --git 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 1808ca1e1f..e3a6dded4d 100644
--- 
a/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ 
b/modules/cluster-management/src/testFixtures/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -110,8 +110,6 @@ public class MockNode {
 
         var logicalTopologyService = new 
LogicalTopologyImpl(clusterStateStorage);
 
-        var distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
         this.clusterManager = new ClusterManagementGroupManager(
                 vaultManager,
                 clusterService,
@@ -119,7 +117,6 @@ public class MockNode {
                 clusterStateStorage,
                 logicalTopologyService,
                 cmgConfiguration,
-                distributedConfigurationUpdater,
                 nodeAttributes
         );
 
@@ -127,7 +124,6 @@ public class MockNode {
         components.add(clusterService);
         components.add(raftManager);
         components.add(clusterStateStorage);
-        components.add(distributedConfigurationUpdater);
         components.add(clusterManager);
     }
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 045cb88766..a817026ba3 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
@@ -113,8 +112,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
 
         private final MetaStorageManagerImpl metaStorageManager;
 
-        private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
         Node(ClusterService clusterService, Path dataPath) {
             this.clusterService = clusterService;
 
@@ -132,8 +129,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
 
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-            distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
             this.cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
@@ -141,7 +136,6 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     clusterStateStorage,
                     logicalTopology,
                     cmgConfiguration,
-                    distributedConfigurationUpdater,
                     nodeAttributes
             );
 
@@ -163,8 +157,7 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     raftManager,
                     clusterStateStorage,
                     cmgManager,
-                    metaStorageManager,
-                    distributedConfigurationUpdater
+                    metaStorageManager
             );
 
             components.forEach(IgniteComponent::start);
@@ -183,8 +176,7 @@ public class ItMetaStorageMultipleNodesTest extends 
IgniteAbstractTest {
                     raftManager,
                     clusterStateStorage,
                     clusterService,
-                    vaultManager,
-                    distributedConfigurationUpdater
+                    vaultManager
             );
 
             Stream<AutoCloseable> beforeNodeStop = components.stream().map(c 
-> c::beforeNodeStop);
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index 9c1aa8696d..2523690d82 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -38,7 +38,6 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -91,8 +90,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
         private final ClusterManagementGroupManager cmgManager;
 
-        private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
         Node(ClusterService clusterService, Path dataPath) {
             var vaultManager = new VaultManager(new InMemoryVaultService());
 
@@ -120,10 +117,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
 
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-            distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
-            components.add(distributedConfigurationUpdater);
-
             this.cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
@@ -131,7 +124,6 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     clusterStateStorage,
                     logicalTopology,
                     cmgConfiguration,
-                    distributedConfigurationUpdater,
                     nodeAttributes
             );
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index 44321fddf9..cc39c3ec57 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -37,7 +37,6 @@ import 
org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.annotation.Value;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -113,8 +112,6 @@ public class ItDistributedConfigurationPropertiesTest {
 
         private final ConfigurationManager distributedCfgManager;
 
-        private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
         /** Flag that disables storage updates. */
         private volatile boolean receivesUpdates = true;
 
@@ -142,8 +139,6 @@ public class ItDistributedConfigurationPropertiesTest {
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-            distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
@@ -151,7 +146,6 @@ public class ItDistributedConfigurationPropertiesTest {
                     clusterStateStorage,
                     logicalTopology,
                     clusterManagementConfiguration,
-                    distributedConfigurationUpdater,
                     nodeAttributes
             );
 
@@ -207,7 +201,7 @@ public class ItDistributedConfigurationPropertiesTest {
         void start() {
             vaultManager.start();
 
-            Stream.of(clusterService, raftManager, cmgManager, 
metaStorageManager, distributedConfigurationUpdater)
+            Stream.of(clusterService, raftManager, cmgManager, 
metaStorageManager)
                     .forEach(IgniteComponent::start);
 
             // deploy watches to propagate data from the metastore into the 
vault
@@ -230,8 +224,7 @@ public class ItDistributedConfigurationPropertiesTest {
                     metaStorageManager,
                     raftManager,
                     clusterService,
-                    vaultManager,
-                    distributedConfigurationUpdater
+                    vaultManager
             );
 
             for (IgniteComponent igniteComponent : components) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index fa398ab430..e1ca5afc7c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -33,7 +33,6 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -92,8 +91,6 @@ public class ItDistributedConfigurationStorageTest {
 
         private final DistributedConfigurationStorage cfgStorage;
 
-        private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
         /**
          * Constructor that simply creates a subset of components of this node.
          */
@@ -115,8 +112,6 @@ public class ItDistributedConfigurationStorageTest {
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-            distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
@@ -124,7 +119,6 @@ public class ItDistributedConfigurationStorageTest {
                     clusterStateStorage,
                     logicalTopology,
                     clusterManagementConfiguration,
-                    distributedConfigurationUpdater,
                     nodeAttributes
             );
 
@@ -147,7 +141,7 @@ public class ItDistributedConfigurationStorageTest {
         void start() throws Exception {
             vaultManager.start();
 
-            Stream.of(clusterService, raftManager, cmgManager, 
metaStorageManager, distributedConfigurationUpdater)
+            Stream.of(clusterService, raftManager, cmgManager, 
metaStorageManager)
                     .forEach(IgniteComponent::start);
 
             // this is needed to avoid assertion errors
@@ -172,7 +166,7 @@ public class ItDistributedConfigurationStorageTest {
          */
         void stop() throws Exception {
             var components =
-                    List.of(metaStorageManager, cmgManager, raftManager, 
clusterService, vaultManager, distributedConfigurationUpdater);
+                    List.of(metaStorageManager, cmgManager, raftManager, 
clusterService, vaultManager);
 
             for (IgniteComponent igniteComponent : components) {
                 igniteComponent.beforeNodeStop();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 32ec34b698..371362c38d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
@@ -589,8 +588,6 @@ public class ItRebalanceDistributedTest {
 
         private final CatalogManager catalogManager;
 
-        private final DistributedConfigurationUpdater 
distributedConfigurationUpdater;
-
         private List<IgniteComponent> nodeComponents;
 
         private final ConfigurationTreeGenerator generator;
@@ -639,8 +636,6 @@ public class ItRebalanceDistributedTest {
             var clusterStateStorage = new TestClusterStateStorage();
             var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-            distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
@@ -648,7 +643,6 @@ public class ItRebalanceDistributedTest {
                     clusterStateStorage,
                     logicalTopology,
                     clusterManagementConfiguration,
-                    distributedConfigurationUpdater,
                     nodeAttributes
             );
 
@@ -835,8 +829,7 @@ public class ItRebalanceDistributedTest {
                     baselineMgr,
                     dataStorageMgr,
                     schemaManager,
-                    tableManager,
-                    distributedConfigurationUpdater
+                    tableManager
             );
 
             nodeComponents.forEach(IgniteComponent::start);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4a35db44ed..a21247c5ac 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.internal.baseline.BaselineManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import 
org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
@@ -292,8 +291,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
 
         var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-        var distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
         var cmgManager = new ClusterManagementGroupManager(
                 vault,
                 clusterSvc,
@@ -301,7 +298,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 clusterStateStorage,
                 logicalTopology,
                 clusterManagementConfiguration,
-                distributedConfigurationUpdater,
                 nodeAttributes
         );
 
@@ -449,7 +445,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
                 replicaMgr,
                 txManager,
                 metaStorageMgr,
-                distributedConfigurationUpdater,
                 clusterCfgMgr,
                 dataStorageManager,
                 catalogManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3948610af4..ad1acdee1a 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogServiceImpl;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import 
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
 import 
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import 
org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
@@ -63,6 +62,7 @@ import 
org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationModules;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
+import 
org.apache.ignite.internal.configuration.DistributedConfigurationUpdater;
 import org.apache.ignite.internal.configuration.SecurityConfiguration;
 import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
 import org.apache.ignite.internal.configuration.presentation.HoconPresentation;
@@ -364,8 +364,6 @@ public class IgniteImpl implements Ignite {
 
         var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
-        distributedConfigurationUpdater = new 
DistributedConfigurationUpdater();
-
         cmgMgr = new ClusterManagementGroupManager(
                 vaultMgr,
                 clusterSvc,
@@ -373,7 +371,6 @@ public class IgniteImpl implements Ignite {
                 clusterStateStorage,
                 logicalTopology,
                 
nodeConfigRegistry.getConfiguration(ClusterManagementConfiguration.KEY),
-                distributedConfigurationUpdater,
                 
nodeConfigRegistry.getConfiguration(NodeAttributesConfiguration.KEY)
         );
 
@@ -406,6 +403,12 @@ public class IgniteImpl implements Ignite {
                 modules.distributed().polymorphicSchemaExtensions()
         );
 
+
+        distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+                cmgMgr,
+                new HoconPresentation(clusterCfgMgr.configurationRegistry())
+        );
+
         ConfigurationRegistry clusterConfigRegistry = 
clusterCfgMgr.configurationRegistry();
 
         TablesConfiguration tablesConfiguration = 
clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
@@ -725,8 +728,11 @@ public class IgniteImpl implements Ignite {
                                 }, startupExecutor);
                     }, startupExecutor)
                     .thenRunAsync(() -> {
-                        HoconPresentation presentation = new 
HoconPresentation(clusterCfgMgr.configurationRegistry());
-                        
distributedConfigurationUpdater.setDistributedConfigurationPresentation(presentation);
+                        try {
+                            
lifecycleManager.startComponent(distributedConfigurationUpdater);
+                        } catch (NodeStoppingException e) {
+                            throw new CompletionException(e);
+                        }
                     }, startupExecutor)
                     // Signal that local recovery is complete and the node is 
ready to join the cluster.
                     .thenComposeAsync(v -> {
diff --git 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
similarity index 52%
rename from 
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
rename to 
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
index 3dd0c9871c..6a6447fab5 100644
--- 
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/DistributedConfigurationUpdater.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.cluster.management;
+package org.apache.ignite.internal.configuration;
 
-import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.lang.NodeStoppingException;
 
 /**
  * Updater is responsible for applying changes to the cluster configuration 
when it's ready.
@@ -31,37 +30,35 @@ public class DistributedConfigurationUpdater implements 
IgniteComponent {
 
     private static final IgniteLogger LOG = 
Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> 
clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void 
setDistributedConfigurationPresentation(ConfigurationPresentation<String> 
presentation) {
-        clusterCfgPresentation.complete(presentation);
-    }
+    private final ConfigurationPresentation<String> presentation;
 
-    /**
-     * Applies changes to the cluster configuration when {@link 
DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be 
applied.
-     * @return Future that will be completed when cluster configuration is 
updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String 
configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> 
presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.error("Unable to update cluster configuration", e);
-                    } else {
-                        LOG.info("Cluster configuration updated successfully");
-                    }
-                });
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager 
cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
     @Override
     public void start() {
-
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenAccept(action -> {
+                    if (action.configuration() != null) {
+                        presentation.update(action.configuration())
+                                .thenApply(ignored -> action)
+                                .thenCompose(it -> it.nextAction().get())
+                                .whenComplete((v, e) -> {
+                                    if (e != null) {
+                                        LOG.error("Failed to update the 
distributed configuration", e);
+                                    } else {
+                                        LOG.info("Distributed configuration is 
updated");
+                                    }
+                                });
+                    }
+                });
     }
 
     @Override
     public void stop() throws Exception {
-        clusterCfgPresentation.completeExceptionally(new 
NodeStoppingException("Component is stopped."));
     }
 }
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
new file mode 100644
index 0000000000..79542d897c
--- /dev/null
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import 
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        
when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction 
updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        () -> {
+                            nextAction.complete(null);
+                            return nextAction;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                
.thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new 
DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration was updated.
+        verify(presentation, times(1)).update(configuration);
+
+        // Verify that next action is completed.
+        assertThat(nextAction, willCompleteSuccessfully());
+    }
+
+    @Test
+    public void nextActionIsCompletedIfConfigurationNull() {
+
+        // Set up mocks.
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        UpdateDistributedConfigurationAction 
updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        null,
+                        () -> {
+                            nextAction.complete(null);
+                            return nextAction;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                
.thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new 
DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration wasn't updated.
+        verify(presentation, never()).update(any());
+
+        // Verify that next action is not completed.
+        assertThat(nextAction, willTimeoutFast());
+    }
+}

Reply via email to