This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c86649d19a IGNITE-20499 Broadcast user-provided initial cluster configuration with cluster state and save it to storage along with defaults (#2817) c86649d19a is described below commit c86649d19ab677799687ce3b7c8a1617ec597afd Author: Ivan Gagarkin <gagarkin....@gmail.com> AuthorDate: Mon Nov 13 17:30:43 2023 +0700 IGNITE-20499 Broadcast user-provided initial cluster configuration with cluster state and save it to storage along with defaults (#2817) --- .../cluster/management/ItClusterManagerTest.java | 67 ++++-------- .../management/ClusterManagementGroupManager.java | 73 ++++--------- .../internal/cluster/management/ClusterState.java | 2 +- .../UpdateDistributedConfigurationAction.java | 66 ------------ .../ConfigurationDynamicDefaultsPatcher.java | 2 +- modules/configuration/README.md | 37 +++++++ .../configuration/ConfigurationChanger.java | 42 +++++++- .../configuration/ConfigurationRegistry.java | 37 +++---- .../configuration/ConfigurationChangerTest.java | 22 ++++ .../org/apache/ignite/internal/app/IgniteImpl.java | 83 ++++++++------- .../DistributedConfigurationUpdater.java | 64 ------------ .../DistributedConfigurationUpdaterTest.java | 116 --------------------- 12 files changed, 204 insertions(+), 407 deletions(-) diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java index da91601235..b68bbd57f6 100644 --- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java +++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java @@ -19,14 +19,13 @@ package org.apache.ignite.internal.cluster.management; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -36,7 +35,6 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -48,7 +46,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.ClusterNode; -import org.awaitility.Awaitility; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -102,6 +99,26 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest { assertThat(cluster.get(1).logicalTopologyNodes(), will(containsInAnyOrder(expectedTopology))); } + /** + * Tests initial cluster setup with provided configuration. + */ + @Test + void testInitWithProvidedConfiguration(TestInfo testInfo) throws Exception { + startCluster(3, testInfo); + + String[] cmgNodes = { cluster.get(0).name() }; + + String[] metaStorageNodes = { cluster.get(1).name() }; + + String configuration = "{security: {enabled: true}}"; + + initCluster(metaStorageNodes, cmgNodes, configuration); + + for (MockNode node : cluster) { + assertThat(node.clusterManager().initialClusterConfigurationFuture(), willBe(configuration)); + } + } + /** * Tests that init fails in case some nodes cannot be found. */ @@ -350,48 +367,6 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest { assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully()); } - @Test - void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception { - // Start a cluster of 3 nodes so that the CMG leader node could be stopped later. - startCluster(3, testInfo); - - String[] cmgNodes = clusterNodeNames(); - - // Start the CMG on all 3 nodes. - String clusterConfiguration = "security.authentication.enabled:true"; - initCluster(cmgNodes, cmgNodes, clusterConfiguration); - - // Find the CMG leader and stop it. - MockNode leaderNode = findLeaderNode(cluster).orElseThrow(); - - // Read cluster configuration from the cluster state and remove it. - UpdateDistributedConfigurationAction leaderAction = leaderNode.clusterManager() - .clusterConfigurationToUpdate() - .get(); - - // Check the leader has configuration. - assertEquals(clusterConfiguration, leaderAction.configuration()); - - // Execute the next action (remove the configuration from the cluster state) - assertThat(leaderAction.nextAction().get(), willCompleteSuccessfully()); - - // Stop the cluster leader to check the new leader is not going to update the configuration. - stopNodes(List.of(leaderNode)); - cluster.remove(leaderNode); - - // Wait for a new leader to be elected. - MockNode newLeaderNode = Awaitility.await() - .timeout(60, TimeUnit.SECONDS) - .until(() -> findLeaderNode(cluster), Optional::isPresent) - .get(); - - // Check the new leader cancels the action. - assertThat( - newLeaderNode.clusterManager().clusterConfigurationToUpdate(), - willThrow(CancellationException.class, 5, TimeUnit.SECONDS) - ); - } - @Test void testLeaderChangeBeforeJoin(TestInfo testInfo) throws Exception { // Start a cluster of 3 nodes so that the CMG leader node could be stopped later. diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java index d880b2412a..cb071bc8a9 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java @@ -103,10 +103,6 @@ public class ClusterManagementGroupManager implements IgniteComponent { */ private final CompletableFuture<Void> joinFuture = new CompletableFuture<>(); - // TODO: IGNITE-19489 Cancel updateDistributedConfigurationActionFuture if the configuration is applied - private final CompletableFuture<UpdateDistributedConfigurationAction> updateDistributedConfigurationActionFuture = - new CompletableFuture<>(); - /** Message factory. */ private final CmgMessagesFactory msgFactory = new CmgMessagesFactory(); @@ -133,6 +129,9 @@ public class ClusterManagementGroupManager implements IgniteComponent { /** Local node's attributes. */ private final NodeAttributes nodeAttributes; + /** Future that resolves into the initial cluster configuration in HOCON format. */ + private final CompletableFuture<String> initialClusterConfigurationFuture = new CompletableFuture<>(); + /** Constructor. */ public ClusterManagementGroupManager( VaultManager vault, @@ -356,6 +355,12 @@ public class ClusterManagementGroupManager implements IgniteComponent { private void onElectedAsLeader(long term) { LOG.info("CMG leader has been elected, executing onLeaderElected callback"); + // The cluster state is broadcast via the messaging service; hence, the future must be completed here on the leader node. + // TODO: This needs to be reworked following the implementation of IGNITE-18275. + raftServiceAfterJoin() + .thenCompose(CmgRaftService::readClusterState) + .thenAccept(state -> initialClusterConfigurationFuture.complete(state.initialClusterConfiguration())); + raftServiceAfterJoin() .thenCompose(this::updateLogicalTopology) .thenCompose(service -> service.updateLearners(term).thenApply(unused -> service)) @@ -384,50 +389,6 @@ public class ClusterManagementGroupManager implements IgniteComponent { LOG.info("onLeaderElected callback executed successfully"); } }); - - raftServiceAfterJoin().thenCompose(service -> service.readClusterState() - .whenComplete((state, e) -> { - if (e != null) { - LOG.error("Error when retrieving cluster configuration", e); - updateDistributedConfigurationActionFuture.completeExceptionally(e); - } else { - String configuration = state.initialClusterConfiguration(); - if (configuration != null) { - updateDistributedConfigurationActionFuture.complete( - new UpdateDistributedConfigurationAction( - configuration, - () -> removeClusterConfigFromClusterState(service) - )); - } else { - updateDistributedConfigurationActionFuture.cancel(true); - } - } - }) - ); - } - - private CompletableFuture<Void> removeClusterConfigFromClusterState(CmgRaftService service) { - return service.readClusterState() - .thenCompose(state -> { - if (state.initialClusterConfiguration() != null) { - ClusterState clusterState = msgFactory.clusterState() - .cmgNodes(Set.copyOf(state.cmgNodes())) - .metaStorageNodes(Set.copyOf(state.metaStorageNodes())) - .version(state.igniteVersion().toString()) - .clusterTag(state.clusterTag()) - .build(); - return service.updateClusterState(clusterState) - .whenComplete((v, e) -> { - if (e != null) { - LOG.error("Error when removing configuration from cluster state", e); - } else { - LOG.info("Cluster configuration is removed from cluster state"); - } - }); - } else { - return completedFuture(null); - } - }); } /** @@ -488,6 +449,9 @@ public class ClusterManagementGroupManager implements IgniteComponent { ClusterState state = msg.clusterState(); + // Complete the initialClusterConfigurationFuture to initialize the cluster configuration on the local node. + initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()); + synchronized (raftServiceLock) { if (raftService == null) { LOG.info("ClusterStateMessage received, starting the CMG [nodes={}]", state.cmgNodes()); @@ -718,7 +682,8 @@ public class ClusterManagementGroupManager implements IgniteComponent { // Fail the futures to unblock dependent operations joinFuture.completeExceptionally(new NodeStoppingException()); - updateDistributedConfigurationActionFuture.completeExceptionally(new NodeStoppingException()); + + initialClusterConfigurationFuture.completeExceptionally(new NodeStoppingException()); } /** @@ -828,8 +793,14 @@ public class ClusterManagementGroupManager implements IgniteComponent { } } - public CompletableFuture<UpdateDistributedConfigurationAction> clusterConfigurationToUpdate() { - return updateDistributedConfigurationActionFuture; + /** + * Returns a future resolving to the initial cluster configuration in HOCON format. The resulting configuration may be {@code null} if + * not provided by the user. + * + * @return a CompletableFuture that, upon completion, provides the initial cluster configuration, which may be {@code null}. + */ + public CompletableFuture<String> initialClusterConfigurationFuture() { + return initialClusterConfigurationFuture; } /** diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java index 82a125cb3c..96bb8f8f5d 100644 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java +++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java @@ -64,7 +64,7 @@ public interface ClusterState extends NetworkMessage, Serializable { } /** - * Returns a cluster configuration that should be applied. + * Returns initial cluster configuration. */ @Nullable String initialClusterConfiguration(); diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java deleted file mode 100644 index 9503616e1f..0000000000 --- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.cluster.management; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -/** - * Action to update the distributed configuration. - */ -public class UpdateDistributedConfigurationAction { - - /** - * Configuration that should be applied. - */ - private final String configuration; - - /** - * The next action to execute. - */ - private final Supplier<CompletableFuture<Void>> nextAction; - - /** - * Constructor. - * - * @param configuration the configuration. - * @param nextAction the next action. - */ - public UpdateDistributedConfigurationAction(String configuration, Supplier<CompletableFuture<Void>> nextAction) { - this.configuration = configuration; - this.nextAction = nextAction; - } - - /** - * Returns the configuration. - * - * @return the configuration. - */ - public String configuration() { - return configuration; - } - - /** - * Returns the next action to execute. - * - * @return the next action. - */ - public Supplier<CompletableFuture<Void>> nextAction() { - return nextAction; - } -} diff --git a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java index 8d2d628f41..43f876dad2 100644 --- a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java +++ b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java @@ -28,5 +28,5 @@ public interface ConfigurationDynamicDefaultsPatcher { * @param hocon The configuration in HOCON format. * @return The patched configuration in HOCON format. */ - public String patchWithDynamicDefaults(String hocon); + String patchWithDynamicDefaults(String hocon); } diff --git a/modules/configuration/README.md b/modules/configuration/README.md index 866d8a64bd..53e6adba28 100644 --- a/modules/configuration/README.md +++ b/modules/configuration/README.md @@ -351,6 +351,43 @@ public interface FirstPolymorphicInstanceView extends PolymorphicView { `ParentView#polymorphicChild()` will return a view of a specific type of polymorphic configuration, for example `FirstPolymorphicInstanceView`. +### Dynamic configuration defaults + +Configuration defaults are defined in the configuration schema. However, it is not possible define them there in the following cases: + +* the value is a list (`NamedListConfiguration`). +* the default value is not known at compile time and it depends on some external factors. + +In such cases, one can override `ConfigurationModule.patchConfigurationWithDynamicDefaults` method to provide the defaults. The method will +be called on cluster initialization with the user-provided configuration tree as an argument. + +Note, that dynamic defaults are not supported for node local configuration. + +```java +public class MyConfigurationModule extends AbstractConfigurationModule { + @Override + protected void patchConfigurationWithDynamicDefaults(SuperRootChange rootChange) { + rootChange.changeRoot(SecurityConfiguration.KEY).changeAuthentication(authenticationChange -> { + if (authenticationChange.changeProviders().size() == 0) { + authenticationChange.changeProviders().create(DEFAULT_PROVIDER_NAME, change -> { + change.convert(BasicAuthenticationProviderChange.class) + .changeUsername(DEFAULT_USERNAME) + .changePassword(DEFAULT_PASSWORD) + .changeRoles(AuthorizationConfigurationSchema.DEFAULT_ROLE); + }); + } + }); + } +} +``` + +### Configuration initialization + +Custom configuration initialization can be done by calling `ConfigurationRegistry#initializeConfigurationWith` method. The method accepts +initial configuration that will be used as a base for the configuration tree. If the configuration is not provided, the default +configuration will be used. The method should be called before `ConfigurationRegistry#start` method. If the method is called after the +start, the provided configuration will be ignored. + ### Changing the configuration To modify the configuration tree, one should use the `change` method, which executes the update requests diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java index 64cd8cf810..e7c5f42241 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java @@ -49,6 +49,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -89,11 +90,18 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange /** Configuration storage. */ private final ConfigurationStorage storage; + /** Configuration validator. */ private final ConfigurationValidator configurationValidator; /** Storage trees. */ private volatile StorageRoots storageRoots; + /** + * Initial configuration. This configuration will be used to initialize the configuration if the storage is empty. If the storage is not + * empty, this configuration will be ignored. + */ + private volatile ConfigurationSource initialConfiguration = ConfigurationUtil.EMPTY_CFG_SRC; + /** Future that resolves after the defaults are persisted to the storage. */ private final CompletableFuture<Void> defaultsPersisted = new CompletableFuture<>(); @@ -103,6 +111,9 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange /** Lock for reading/updating the {@link #storageRoots}. Fair, to give a higher priority to external updates. */ private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); + /** Flag indicating whether the component is started. */ + private final AtomicBoolean started = new AtomicBoolean(false); + /** * Closure interface to be used by the configuration changer. An instance of this closure is passed into the constructor and invoked * every time when there's an update from any of the storages. @@ -243,9 +254,12 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange throw new ConfigurationChangeException("Failed to initialize configuration: " + e.getMessage(), e); } + Map<String, ? extends Serializable> storageValues = data.values(); + long version = data.changeId(); + SuperRoot superRoot = new SuperRoot(rootCreator()); - Map<String, ?> dataValuesPrefixMap = toPrefixMap(data.values()); + Map<String, ?> dataValuesPrefixMap = toPrefixMap(storageValues); for (RootKey<?, ?> rootKey : rootKeys.values()) { Map<String, ?> rootPrefixMap = (Map<String, ?>) dataValuesPrefixMap.get(rootKey.key()); @@ -263,6 +277,11 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange addDefaults(superRoot); + // Fill the configuration with the initial configuration. + if (version == 0) { + initialConfiguration.descend(superRoot); + } + // Validate the restored configuration. validateConfiguration(superRoot); // We store two configuration roots, one with the defaults set and another one without them. @@ -276,6 +295,8 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange storage.registerConfigurationListener(configurationStorageListener()); persistDefaults(); + + started.set(true); } /** @@ -285,7 +306,11 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange * default values that are not persisted to the storage and writes them if there are any. */ private void persistDefaults() { - changeInternally(ConfigurationUtil.EMPTY_CFG_SRC, true) + // If the storage version is 0, it indicates that the storage is empty. + // In this case, write the defaults along with the initial configuration. + ConfigurationSource cfgSrc = storageRoots.version == 0 ? initialConfiguration : ConfigurationUtil.EMPTY_CFG_SRC; + + changeInternally(cfgSrc, true) .whenComplete((v, e) -> { if (e == null) { defaultsPersisted.complete(null); @@ -295,6 +320,19 @@ public abstract class ConfigurationChanger implements DynamicConfigurationChange }); } + /** + * Sets {@link #initialConfiguration}. This configuration will be used to initialize the configuration if the storage is empty. If the + * storage is not empty, this configuration will be ignored. This method must be called before {@link #start()}. If the method is not + * called, the initial configuration will be empty. + * + * @param configurationSource the configuration source to initialize with. + */ + public void initializeConfigurationWith(ConfigurationSource configurationSource) { + assert !started.get() : "ConfigurationChanger#initializeConfigurationWith must be called before the start."; + + initialConfiguration = configurationSource; + } + /** {@inheritDoc} */ @Override public CompletableFuture<Void> change(ConfigurationSource source) { diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java index ba085c2b19..6dc00dc53a 100644 --- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java +++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java @@ -22,14 +22,11 @@ import static org.apache.ignite.internal.configuration.notifications.Configurati import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType; import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.innerNodeVisitor; -import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import org.apache.ignite.configuration.ConfigurationTree; @@ -38,7 +35,6 @@ import org.apache.ignite.configuration.SuperRootChange; import org.apache.ignite.configuration.notifications.ConfigurationListener; import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; -import org.apache.ignite.configuration.validation.Validator; import org.apache.ignite.internal.configuration.ConfigurationChanger.ConfigurationUpdateListener; import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.tree.ConfigurationSource; @@ -96,22 +92,6 @@ public class ConfigurationRegistry implements IgniteComponent { }); } - /** - * Registers default validator implementation to the validators map. - * - * @param validators Validators map. - * @param annotatopnType Annotation type instance for the validator. - * @param validator Validator instance. - * @param <A> Annotation type. - */ - private static <A extends Annotation> void addDefaultValidator( - Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators, - Class<A> annotatopnType, - Validator<A, ?> validator - ) { - validators.computeIfAbsent(annotatopnType, a -> new HashSet<>(1)).add(validator); - } - /** {@inheritDoc} */ @Override public void start() { @@ -131,13 +111,24 @@ public class ConfigurationRegistry implements IgniteComponent { return changer.onDefaultsPersisted(); } + /** + * Initializes the configuration with the given source. This method should be used only for the initial setup of the configuration. The + * configuration is initialized with the provided source only if the storage is empty, and it is saved along with the defaults. This + * method must be called before {@link #start()}. + * + * @param configurationSource the configuration source to initialize with. + */ + public void initializeConfigurationWith(ConfigurationSource configurationSource) { + changer.initializeConfigurationWith(configurationSource); + } + /** * Gets the public configuration tree. * * @param rootKey Root key. - * @param <V> View type. - * @param <C> Change type. - * @param <T> Configuration tree type. + * @param <V> View type. + * @param <C> Change type. + * @param <T> Configuration tree type. * @return Public configuration tree. */ public <V, C, T extends ConfigurationTree<V, C>> T getConfiguration(RootKey<T, V> rootKey) { diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java index 7457145a44..b620eb8483 100644 --- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java +++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java @@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; +import com.typesafe.config.ConfigFactory; import java.io.Serializable; import java.lang.annotation.Retention; import java.lang.annotation.Target; @@ -61,6 +62,7 @@ import org.apache.ignite.configuration.validation.ValidationContext; import org.apache.ignite.configuration.validation.ValidationIssue; import org.apache.ignite.configuration.validation.Validator; import org.apache.ignite.internal.configuration.direct.KeyPathNode; +import org.apache.ignite.internal.configuration.hocon.HoconConverter; import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.storage.Data; import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; @@ -568,6 +570,26 @@ public class ConfigurationChangerTest { assertEquals(3, storage.lastRevision().get(1, SECONDS)); } + @Test + public void initializeWith() throws Exception { + // When we initialize the configuration with a source, the configuration should be updated with the values from the source. + String initialConfiguration = "def:{child:{defStr:initialStr,arr:[bar]}}"; + com.typesafe.config.Config config = ConfigFactory.parseString(initialConfiguration); + ConfigurationSource hoconSource = HoconConverter.hoconSource(config.root()); + + ConfigurationChanger changer = createChanger(DefaultsConfiguration.KEY); + + changer.initializeConfigurationWith(hoconSource); + + changer.start(); + + DefaultsView root = (DefaultsView) changer.getRootNode(DefaultsConfiguration.KEY); + + assertEquals("foo", root.defStr()); + assertEquals("initialStr", root.child().defStr()); + assertArrayEquals(new String[]{"bar"}, root.child().arr()); + } + private static <CHANGET> ConfigurationSource source(RootKey<?, ? super CHANGET> rootKey, Consumer<CHANGET> changer) { return new ConfigurationSource() { @Override diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index fce6db22b6..719f9332b0 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.app; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -39,6 +41,7 @@ import org.apache.ignite.IgnitionManager; import org.apache.ignite.client.handler.ClientHandlerMetricSource; import org.apache.ignite.client.handler.ClientHandlerModule; import org.apache.ignite.compute.IgniteCompute; +import org.apache.ignite.configuration.ConfigurationDynamicDefaultsPatcher; import org.apache.ignite.configuration.ConfigurationModule; import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.catalog.CatalogManagerImpl; @@ -69,13 +72,13 @@ import org.apache.ignite.internal.configuration.ConfigurationManager; import org.apache.ignite.internal.configuration.ConfigurationModules; import org.apache.ignite.internal.configuration.ConfigurationRegistry; import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; -import org.apache.ignite.internal.configuration.DistributedConfigurationUpdater; import org.apache.ignite.internal.configuration.JdbcPortProviderImpl; import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider; -import org.apache.ignite.internal.configuration.presentation.HoconPresentation; +import org.apache.ignite.internal.configuration.hocon.HoconConverter; import org.apache.ignite.internal.configuration.storage.ConfigurationStorage; import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage; import org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage; +import org.apache.ignite.internal.configuration.tree.ConfigurationSource; import org.apache.ignite.internal.configuration.validation.ConfigurationValidator; import org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl; import org.apache.ignite.internal.deployunit.DeploymentManagerImpl; @@ -232,15 +235,9 @@ public class IgniteImpl implements Ignite { /** Placement driver manager. */ private final PlacementDriverManager placementDriverMgr; - /** Distributed configuration validator. */ - private final ConfigurationValidator distributedConfigurationValidator; - /** Configuration manager that handles cluster (distributed) configuration. */ private final ConfigurationManager clusterCfgMgr; - /** Cluster configuration defaults setter. */ - private final ConfigurationDynamicDefaultsPatcherImpl clusterConfigurationDefaultsSetter; - /** Cluster initializer. */ private final ClusterInitializer clusterInitializer; @@ -301,8 +298,6 @@ public class IgniteImpl implements Ignite { private final RestAddressReporter restAddressReporter; - private final DistributedConfigurationUpdater distributedConfigurationUpdater; - private final CatalogManager catalogManager; private final AuthenticationManager authenticationManager; @@ -407,8 +402,21 @@ public class IgniteImpl implements Ignite { modules.distributed().polymorphicSchemaExtensions() ); - distributedConfigurationValidator = - ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator, modules.distributed().validators()); + ConfigurationValidator distributedCfgValidator = ConfigurationValidatorImpl.withDefaultValidators( + distributedConfigurationGenerator, + modules.distributed().validators() + ); + + ConfigurationDynamicDefaultsPatcher clusterCfgDynamicDefaultsPatcher = new ConfigurationDynamicDefaultsPatcherImpl( + modules.distributed(), + distributedConfigurationGenerator + ); + + clusterInitializer = new ClusterInitializer( + clusterSvc, + clusterCfgDynamicDefaultsPatcher, + distributedCfgValidator + ); NodeAttributesCollector nodeAttributesCollector = new NodeAttributesCollector( @@ -416,16 +424,6 @@ public class IgniteImpl implements Ignite { nodeConfigRegistry.getConfiguration(StorageProfilesConfiguration.KEY) ); - - clusterConfigurationDefaultsSetter = - new ConfigurationDynamicDefaultsPatcherImpl(modules.distributed(), distributedConfigurationGenerator); - - clusterInitializer = new ClusterInitializer( - clusterSvc, - clusterConfigurationDefaultsSetter, - distributedConfigurationValidator - ); - cmgMgr = new ClusterManagementGroupManager( vaultMgr, clusterSvc, @@ -463,16 +461,11 @@ public class IgniteImpl implements Ignite { modules.distributed().rootKeys(), cfgStorage, distributedConfigurationGenerator, - distributedConfigurationValidator + distributedCfgValidator ); ConfigurationRegistry clusterConfigRegistry = clusterCfgMgr.configurationRegistry(); - distributedConfigurationUpdater = new DistributedConfigurationUpdater( - cmgMgr, - new HoconPresentation(clusterCfgMgr.configurationRegistry()) - ); - metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY)); Consumer<LongFunction<CompletableFuture<?>>> registry = c -> metaStorageMgr.registerRevisionUpdateListener(c::apply); @@ -803,6 +796,14 @@ public class IgniteImpl implements Ignite { return metaStorageMgr.recoveryFinishedFuture(); }, startupExecutor) + .thenComposeAsync(revision -> { + // If the revision is greater than 0, then the configuration has already been initialized. + if (revision > 0) { + return CompletableFuture.completedFuture(null); + } else { + return initializeClusterConfiguration(startupExecutor); + } + }, startupExecutor) .thenRunAsync(() -> { LOG.info("MetaStorage started, starting the remaining components"); @@ -843,13 +844,6 @@ public class IgniteImpl implements Ignite { return recoverComponentsStateOnStart(startupExecutor); }, startupExecutor) .thenComposeAsync(v -> clusterCfgMgr.configurationRegistry().onDefaultsPersisted(), startupExecutor) - .thenRunAsync(() -> { - try { - lifecycleManager.startComponent(distributedConfigurationUpdater); - } catch (NodeStoppingException e) { - throw new CompletionException(e); - } - }, startupExecutor) // Signal that local recovery is complete and the node is ready to join the cluster. .thenComposeAsync(v -> { LOG.info("Recovery complete, finishing join"); @@ -1049,8 +1043,23 @@ public class IgniteImpl implements Ignite { } /** - * Recovers components state on start by invoking configuration listeners ({@link #notifyConfigurationListeners()} - * and deploying watches after that. + * Initializes the cluster configuration with the specified user-provided configuration upon cluster initialization. + */ + private CompletableFuture<Void> initializeClusterConfiguration(ExecutorService startupExecutor) { + return cmgMgr.initialClusterConfigurationFuture().thenAcceptAsync(cfg -> { + if (cfg == null) { + return; + } + + Config config = ConfigFactory.parseString(cfg); + ConfigurationSource hoconSource = HoconConverter.hoconSource(config.root()); + clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource); + }, startupExecutor); + } + + /** + * Recovers components state on start by invoking configuration listeners ({@link #notifyConfigurationListeners()} and deploying watches + * after that. */ private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService startupExecutor) { CompletableFuture<Void> startupConfigurationUpdate = notifyConfigurationListeners(); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java deleted file mode 100644 index 6a6447fab5..0000000000 --- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.configuration; - -import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; -import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; -import org.apache.ignite.internal.manager.IgniteComponent; - -/** - * Updater is responsible for applying changes to the cluster configuration when it's ready. - */ -public class DistributedConfigurationUpdater implements IgniteComponent { - - private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class); - - private final ClusterManagementGroupManager cmgMgr; - - private final ConfigurationPresentation<String> presentation; - - public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) { - this.cmgMgr = cmgMgr; - this.presentation = presentation; - } - - @Override - public void start() { - cmgMgr.clusterConfigurationToUpdate() - .thenAccept(action -> { - if (action.configuration() != null) { - presentation.update(action.configuration()) - .thenApply(ignored -> action) - .thenCompose(it -> it.nextAction().get()) - .whenComplete((v, e) -> { - if (e != null) { - LOG.error("Failed to update the distributed configuration", e); - } else { - LOG.info("Distributed configuration is updated"); - } - }); - } - }); - } - - @Override - public void stop() throws Exception { - } -} diff --git a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java deleted file mode 100644 index 61a480fd31..0000000000 --- a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.configuration; - -import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast; -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; -import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction; -import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation; -import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class DistributedConfigurationUpdaterTest extends BaseIgniteAbstractTest { - - @Mock - public ConfigurationPresentation<String> presentation; - - @Mock - public ClusterManagementGroupManager cmgMgr; - - @Test - public void nextActionIsCompletedAfterUpdatingConfiguration() { - - // Set up mocks. - when(presentation.update(anyString())).thenReturn(completedFuture(null)); - - CompletableFuture<Void> nextAction = new CompletableFuture<>(); - String configuration = "security.enabled:true"; - UpdateDistributedConfigurationAction updateDistributedConfigurationAction = - new UpdateDistributedConfigurationAction( - configuration, - () -> { - nextAction.complete(null); - return nextAction; - } - ); - - when(cmgMgr.clusterConfigurationToUpdate()) - .thenReturn(completedFuture(updateDistributedConfigurationAction)); - - // Run updater. - DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater( - cmgMgr, - presentation - ); - - distributedConfigurationUpdater.start(); - - // Verify that configuration was updated. - verify(presentation, times(1)).update(configuration); - - // Verify that next action is completed. - assertThat(nextAction, willCompleteSuccessfully()); - } - - @Test - public void nextActionIsCompletedIfConfigurationNull() { - - // Set up mocks. - CompletableFuture<Void> nextAction = new CompletableFuture<>(); - UpdateDistributedConfigurationAction updateDistributedConfigurationAction = - new UpdateDistributedConfigurationAction( - null, - () -> { - nextAction.complete(null); - return nextAction; - } - ); - - when(cmgMgr.clusterConfigurationToUpdate()) - .thenReturn(completedFuture(updateDistributedConfigurationAction)); - - // Run updater. - DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater( - cmgMgr, - presentation - ); - - distributedConfigurationUpdater.start(); - - // Verify that configuration wasn't updated. - verify(presentation, never()).update(any()); - - // Verify that next action is not completed. - assertThat(nextAction, willTimeoutFast()); - } -}