This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c86649d19a IGNITE-20499 Broadcast user-provided initial cluster
configuration with cluster state and save it to storage along with defaults
(#2817)
c86649d19a is described below
commit c86649d19ab677799687ce3b7c8a1617ec597afd
Author: Ivan Gagarkin <[email protected]>
AuthorDate: Mon Nov 13 17:30:43 2023 +0700
IGNITE-20499 Broadcast user-provided initial cluster configuration with
cluster state and save it to storage along with defaults (#2817)
---
.../cluster/management/ItClusterManagerTest.java | 67 ++++--------
.../management/ClusterManagementGroupManager.java | 73 ++++---------
.../internal/cluster/management/ClusterState.java | 2 +-
.../UpdateDistributedConfigurationAction.java | 66 ------------
.../ConfigurationDynamicDefaultsPatcher.java | 2 +-
modules/configuration/README.md | 37 +++++++
.../configuration/ConfigurationChanger.java | 42 +++++++-
.../configuration/ConfigurationRegistry.java | 37 +++----
.../configuration/ConfigurationChangerTest.java | 22 ++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 83 ++++++++-------
.../DistributedConfigurationUpdater.java | 64 ------------
.../DistributedConfigurationUpdaterTest.java | 116 ---------------------
12 files changed, 204 insertions(+), 407 deletions(-)
diff --git
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index da91601235..b68bbd57f6 100644
---
a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++
b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -19,14 +19,13 @@ package org.apache.ignite.internal.cluster.management;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -36,7 +35,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
-import org.awaitility.Awaitility;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -102,6 +99,26 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
assertThat(cluster.get(1).logicalTopologyNodes(),
will(containsInAnyOrder(expectedTopology)));
}
+ /**
+ * Tests initial cluster setup with provided configuration.
+ */
+ @Test
+ void testInitWithProvidedConfiguration(TestInfo testInfo) throws Exception
{
+ startCluster(3, testInfo);
+
+ String[] cmgNodes = { cluster.get(0).name() };
+
+ String[] metaStorageNodes = { cluster.get(1).name() };
+
+ String configuration = "{security: {enabled: true}}";
+
+ initCluster(metaStorageNodes, cmgNodes, configuration);
+
+ for (MockNode node : cluster) {
+
assertThat(node.clusterManager().initialClusterConfigurationFuture(),
willBe(configuration));
+ }
+ }
+
/**
* Tests that init fails in case some nodes cannot be found.
*/
@@ -350,48 +367,6 @@ public class ItClusterManagerTest extends
BaseItClusterManagementTest {
assertThat(node.clusterManager().onJoinReady(),
willCompleteSuccessfully());
}
- @Test
- void
testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo
testInfo) throws Exception {
- // Start a cluster of 3 nodes so that the CMG leader node could be
stopped later.
- startCluster(3, testInfo);
-
- String[] cmgNodes = clusterNodeNames();
-
- // Start the CMG on all 3 nodes.
- String clusterConfiguration = "security.authentication.enabled:true";
- initCluster(cmgNodes, cmgNodes, clusterConfiguration);
-
- // Find the CMG leader and stop it.
- MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
-
- // Read cluster configuration from the cluster state and remove it.
- UpdateDistributedConfigurationAction leaderAction =
leaderNode.clusterManager()
- .clusterConfigurationToUpdate()
- .get();
-
- // Check the leader has configuration.
- assertEquals(clusterConfiguration, leaderAction.configuration());
-
- // Execute the next action (remove the configuration from the cluster
state)
- assertThat(leaderAction.nextAction().get(),
willCompleteSuccessfully());
-
- // Stop the cluster leader to check the new leader is not going to
update the configuration.
- stopNodes(List.of(leaderNode));
- cluster.remove(leaderNode);
-
- // Wait for a new leader to be elected.
- MockNode newLeaderNode = Awaitility.await()
- .timeout(60, TimeUnit.SECONDS)
- .until(() -> findLeaderNode(cluster), Optional::isPresent)
- .get();
-
- // Check the new leader cancels the action.
- assertThat(
- newLeaderNode.clusterManager().clusterConfigurationToUpdate(),
- willThrow(CancellationException.class, 5, TimeUnit.SECONDS)
- );
- }
-
@Test
void testLeaderChangeBeforeJoin(TestInfo testInfo) throws Exception {
// Start a cluster of 3 nodes so that the CMG leader node could be
stopped later.
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d880b2412a..cb071bc8a9 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -103,10 +103,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
*/
private final CompletableFuture<Void> joinFuture = new
CompletableFuture<>();
- // TODO: IGNITE-19489 Cancel updateDistributedConfigurationActionFuture if
the configuration is applied
- private final CompletableFuture<UpdateDistributedConfigurationAction>
updateDistributedConfigurationActionFuture =
- new CompletableFuture<>();
-
/** Message factory. */
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
@@ -133,6 +129,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
/** Local node's attributes. */
private final NodeAttributes nodeAttributes;
+ /** Future that resolves into the initial cluster configuration in HOCON
format. */
+ private final CompletableFuture<String> initialClusterConfigurationFuture
= new CompletableFuture<>();
+
/** Constructor. */
public ClusterManagementGroupManager(
VaultManager vault,
@@ -356,6 +355,12 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
private void onElectedAsLeader(long term) {
LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
+ // The cluster state is broadcast via the messaging service; hence,
the future must be completed here on the leader node.
+ // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
+ raftServiceAfterJoin()
+ .thenCompose(CmgRaftService::readClusterState)
+ .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
raftServiceAfterJoin()
.thenCompose(this::updateLogicalTopology)
.thenCompose(service ->
service.updateLearners(term).thenApply(unused -> service))
@@ -384,50 +389,6 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
LOG.info("onLeaderElected callback executed
successfully");
}
});
-
- raftServiceAfterJoin().thenCompose(service ->
service.readClusterState()
- .whenComplete((state, e) -> {
- if (e != null) {
- LOG.error("Error when retrieving cluster
configuration", e);
-
updateDistributedConfigurationActionFuture.completeExceptionally(e);
- } else {
- String configuration =
state.initialClusterConfiguration();
- if (configuration != null) {
-
updateDistributedConfigurationActionFuture.complete(
- new UpdateDistributedConfigurationAction(
- configuration,
- () ->
removeClusterConfigFromClusterState(service)
- ));
- } else {
-
updateDistributedConfigurationActionFuture.cancel(true);
- }
- }
- })
- );
- }
-
- private CompletableFuture<Void>
removeClusterConfigFromClusterState(CmgRaftService service) {
- return service.readClusterState()
- .thenCompose(state -> {
- if (state.initialClusterConfiguration() != null) {
- ClusterState clusterState = msgFactory.clusterState()
- .cmgNodes(Set.copyOf(state.cmgNodes()))
-
.metaStorageNodes(Set.copyOf(state.metaStorageNodes()))
- .version(state.igniteVersion().toString())
- .clusterTag(state.clusterTag())
- .build();
- return service.updateClusterState(clusterState)
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Error when removing
configuration from cluster state", e);
- } else {
- LOG.info("Cluster configuration is
removed from cluster state");
- }
- });
- } else {
- return completedFuture(null);
- }
- });
}
/**
@@ -488,6 +449,9 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
ClusterState state = msg.clusterState();
+ // Complete the initialClusterConfigurationFuture to initialize the
cluster configuration on the local node.
+
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration());
+
synchronized (raftServiceLock) {
if (raftService == null) {
LOG.info("ClusterStateMessage received, starting the CMG
[nodes={}]", state.cmgNodes());
@@ -718,7 +682,8 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
// Fail the futures to unblock dependent operations
joinFuture.completeExceptionally(new NodeStoppingException());
- updateDistributedConfigurationActionFuture.completeExceptionally(new
NodeStoppingException());
+
+ initialClusterConfigurationFuture.completeExceptionally(new
NodeStoppingException());
}
/**
@@ -828,8 +793,14 @@ public class ClusterManagementGroupManager implements
IgniteComponent {
}
}
- public CompletableFuture<UpdateDistributedConfigurationAction>
clusterConfigurationToUpdate() {
- return updateDistributedConfigurationActionFuture;
+ /**
+ * Returns a future resolving to the initial cluster configuration in
HOCON format. The resulting configuration may be {@code null} if
+ * not provided by the user.
+ *
+ * @return a CompletableFuture that, upon completion, provides the initial
cluster configuration, which may be {@code null}.
+ */
+ public CompletableFuture<String> initialClusterConfigurationFuture() {
+ return initialClusterConfigurationFuture;
}
/**
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
index 82a125cb3c..96bb8f8f5d 100644
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
+++
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterState.java
@@ -64,7 +64,7 @@ public interface ClusterState extends NetworkMessage,
Serializable {
}
/**
- * Returns a cluster configuration that should be applied.
+ * Returns initial cluster configuration.
*/
@Nullable
String initialClusterConfiguration();
diff --git
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
deleted file mode 100644
index 9503616e1f..0000000000
---
a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.cluster.management;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
-/**
- * Action to update the distributed configuration.
- */
-public class UpdateDistributedConfigurationAction {
-
- /**
- * Configuration that should be applied.
- */
- private final String configuration;
-
- /**
- * The next action to execute.
- */
- private final Supplier<CompletableFuture<Void>> nextAction;
-
- /**
- * Constructor.
- *
- * @param configuration the configuration.
- * @param nextAction the next action.
- */
- public UpdateDistributedConfigurationAction(String configuration,
Supplier<CompletableFuture<Void>> nextAction) {
- this.configuration = configuration;
- this.nextAction = nextAction;
- }
-
- /**
- * Returns the configuration.
- *
- * @return the configuration.
- */
- public String configuration() {
- return configuration;
- }
-
- /**
- * Returns the next action to execute.
- *
- * @return the next action.
- */
- public Supplier<CompletableFuture<Void>> nextAction() {
- return nextAction;
- }
-}
diff --git
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java
index 8d2d628f41..43f876dad2 100644
---
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java
+++
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationDynamicDefaultsPatcher.java
@@ -28,5 +28,5 @@ public interface ConfigurationDynamicDefaultsPatcher {
* @param hocon The configuration in HOCON format.
* @return The patched configuration in HOCON format.
*/
- public String patchWithDynamicDefaults(String hocon);
+ String patchWithDynamicDefaults(String hocon);
}
diff --git a/modules/configuration/README.md b/modules/configuration/README.md
index 866d8a64bd..53e6adba28 100644
--- a/modules/configuration/README.md
+++ b/modules/configuration/README.md
@@ -351,6 +351,43 @@ public interface FirstPolymorphicInstanceView extends
PolymorphicView {
`ParentView#polymorphicChild()` will return a view of a specific type of
polymorphic configuration, for example `FirstPolymorphicInstanceView`.
+### Dynamic configuration defaults
+
+Configuration defaults are defined in the configuration schema. However, it is
not possible define them there in the following cases:
+
+* the value is a list (`NamedListConfiguration`).
+* the default value is not known at compile time and it depends on some
external factors.
+
+In such cases, one can override
`ConfigurationModule.patchConfigurationWithDynamicDefaults` method to provide
the defaults. The method will
+be called on cluster initialization with the user-provided configuration tree
as an argument.
+
+Note, that dynamic defaults are not supported for node local configuration.
+
+```java
+public class MyConfigurationModule extends AbstractConfigurationModule {
+ @Override
+ protected void patchConfigurationWithDynamicDefaults(SuperRootChange
rootChange) {
+
rootChange.changeRoot(SecurityConfiguration.KEY).changeAuthentication(authenticationChange
-> {
+ if (authenticationChange.changeProviders().size() == 0) {
+ authenticationChange.changeProviders().create(DEFAULT_PROVIDER_NAME,
change -> {
+ change.convert(BasicAuthenticationProviderChange.class)
+ .changeUsername(DEFAULT_USERNAME)
+ .changePassword(DEFAULT_PASSWORD)
+ .changeRoles(AuthorizationConfigurationSchema.DEFAULT_ROLE);
+ });
+ }
+ });
+ }
+}
+```
+
+### Configuration initialization
+
+Custom configuration initialization can be done by calling
`ConfigurationRegistry#initializeConfigurationWith` method. The method accepts
+initial configuration that will be used as a base for the configuration tree.
If the configuration is not provided, the default
+configuration will be used. The method should be called before
`ConfigurationRegistry#start` method. If the method is called after the
+start, the provided configuration will be ignored.
+
### Changing the configuration
To modify the configuration tree, one should use the `change` method, which
executes the update requests
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 64cd8cf810..e7c5f42241 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -49,6 +49,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -89,11 +90,18 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
/** Configuration storage. */
private final ConfigurationStorage storage;
+ /** Configuration validator. */
private final ConfigurationValidator configurationValidator;
/** Storage trees. */
private volatile StorageRoots storageRoots;
+ /**
+ * Initial configuration. This configuration will be used to initialize
the configuration if the storage is empty. If the storage is not
+ * empty, this configuration will be ignored.
+ */
+ private volatile ConfigurationSource initialConfiguration =
ConfigurationUtil.EMPTY_CFG_SRC;
+
/** Future that resolves after the defaults are persisted to the storage.
*/
private final CompletableFuture<Void> defaultsPersisted = new
CompletableFuture<>();
@@ -103,6 +111,9 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
/** Lock for reading/updating the {@link #storageRoots}. Fair, to give a
higher priority to external updates. */
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+ /** Flag indicating whether the component is started. */
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
/**
* Closure interface to be used by the configuration changer. An instance
of this closure is passed into the constructor and invoked
* every time when there's an update from any of the storages.
@@ -243,9 +254,12 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
throw new ConfigurationChangeException("Failed to initialize
configuration: " + e.getMessage(), e);
}
+ Map<String, ? extends Serializable> storageValues = data.values();
+ long version = data.changeId();
+
SuperRoot superRoot = new SuperRoot(rootCreator());
- Map<String, ?> dataValuesPrefixMap = toPrefixMap(data.values());
+ Map<String, ?> dataValuesPrefixMap = toPrefixMap(storageValues);
for (RootKey<?, ?> rootKey : rootKeys.values()) {
Map<String, ?> rootPrefixMap = (Map<String, ?>)
dataValuesPrefixMap.get(rootKey.key());
@@ -263,6 +277,11 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
addDefaults(superRoot);
+ // Fill the configuration with the initial configuration.
+ if (version == 0) {
+ initialConfiguration.descend(superRoot);
+ }
+
// Validate the restored configuration.
validateConfiguration(superRoot);
// We store two configuration roots, one with the defaults set and
another one without them.
@@ -276,6 +295,8 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
storage.registerConfigurationListener(configurationStorageListener());
persistDefaults();
+
+ started.set(true);
}
/**
@@ -285,7 +306,11 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
* default values that are not persisted to the storage and writes them if
there are any.
*/
private void persistDefaults() {
- changeInternally(ConfigurationUtil.EMPTY_CFG_SRC, true)
+ // If the storage version is 0, it indicates that the storage is empty.
+ // In this case, write the defaults along with the initial
configuration.
+ ConfigurationSource cfgSrc = storageRoots.version == 0 ?
initialConfiguration : ConfigurationUtil.EMPTY_CFG_SRC;
+
+ changeInternally(cfgSrc, true)
.whenComplete((v, e) -> {
if (e == null) {
defaultsPersisted.complete(null);
@@ -295,6 +320,19 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
});
}
+ /**
+ * Sets {@link #initialConfiguration}. This configuration will be used to
initialize the configuration if the storage is empty. If the
+ * storage is not empty, this configuration will be ignored. This method
must be called before {@link #start()}. If the method is not
+ * called, the initial configuration will be empty.
+ *
+ * @param configurationSource the configuration source to initialize with.
+ */
+ public void initializeConfigurationWith(ConfigurationSource
configurationSource) {
+ assert !started.get() :
"ConfigurationChanger#initializeConfigurationWith must be called before the
start.";
+
+ initialConfiguration = configurationSource;
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> change(ConfigurationSource source) {
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
index ba085c2b19..6dc00dc53a 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
@@ -22,14 +22,11 @@ import static
org.apache.ignite.internal.configuration.notifications.Configurati
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType;
import static
org.apache.ignite.internal.configuration.util.ConfigurationUtil.innerNodeVisitor;
-import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.configuration.ConfigurationTree;
@@ -38,7 +35,6 @@ import org.apache.ignite.configuration.SuperRootChange;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.configuration.validation.Validator;
import
org.apache.ignite.internal.configuration.ConfigurationChanger.ConfigurationUpdateListener;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
@@ -96,22 +92,6 @@ public class ConfigurationRegistry implements
IgniteComponent {
});
}
- /**
- * Registers default validator implementation to the validators map.
- *
- * @param validators Validators map.
- * @param annotatopnType Annotation type instance for the validator.
- * @param validator Validator instance.
- * @param <A> Annotation type.
- */
- private static <A extends Annotation> void addDefaultValidator(
- Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators,
- Class<A> annotatopnType,
- Validator<A, ?> validator
- ) {
- validators.computeIfAbsent(annotatopnType, a -> new
HashSet<>(1)).add(validator);
- }
-
/** {@inheritDoc} */
@Override
public void start() {
@@ -131,13 +111,24 @@ public class ConfigurationRegistry implements
IgniteComponent {
return changer.onDefaultsPersisted();
}
+ /**
+ * Initializes the configuration with the given source. This method should
be used only for the initial setup of the configuration. The
+ * configuration is initialized with the provided source only if the
storage is empty, and it is saved along with the defaults. This
+ * method must be called before {@link #start()}.
+ *
+ * @param configurationSource the configuration source to initialize with.
+ */
+ public void initializeConfigurationWith(ConfigurationSource
configurationSource) {
+ changer.initializeConfigurationWith(configurationSource);
+ }
+
/**
* Gets the public configuration tree.
*
* @param rootKey Root key.
- * @param <V> View type.
- * @param <C> Change type.
- * @param <T> Configuration tree type.
+ * @param <V> View type.
+ * @param <C> Change type.
+ * @param <T> Configuration tree type.
* @return Public configuration tree.
*/
public <V, C, T extends ConfigurationTree<V, C>> T
getConfiguration(RootKey<T, V> rootKey) {
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
index 7457145a44..b620eb8483 100644
---
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationChangerTest.java
@@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
+import com.typesafe.config.ConfigFactory;
import java.io.Serializable;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
@@ -61,6 +62,7 @@ import
org.apache.ignite.configuration.validation.ValidationContext;
import org.apache.ignite.configuration.validation.ValidationIssue;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.configuration.direct.KeyPathNode;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.Data;
import
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
@@ -568,6 +570,26 @@ public class ConfigurationChangerTest {
assertEquals(3, storage.lastRevision().get(1, SECONDS));
}
+ @Test
+ public void initializeWith() throws Exception {
+ // When we initialize the configuration with a source, the
configuration should be updated with the values from the source.
+ String initialConfiguration =
"def:{child:{defStr:initialStr,arr:[bar]}}";
+ com.typesafe.config.Config config =
ConfigFactory.parseString(initialConfiguration);
+ ConfigurationSource hoconSource =
HoconConverter.hoconSource(config.root());
+
+ ConfigurationChanger changer =
createChanger(DefaultsConfiguration.KEY);
+
+ changer.initializeConfigurationWith(hoconSource);
+
+ changer.start();
+
+ DefaultsView root = (DefaultsView)
changer.getRootNode(DefaultsConfiguration.KEY);
+
+ assertEquals("foo", root.defStr());
+ assertEquals("initialStr", root.child().defStr());
+ assertArrayEquals(new String[]{"bar"}, root.child().arr());
+ }
+
private static <CHANGET> ConfigurationSource source(RootKey<?, ? super
CHANGET> rootKey, Consumer<CHANGET> changer) {
return new ConfigurationSource() {
@Override
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index fce6db22b6..719f9332b0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.app;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -39,6 +41,7 @@ import org.apache.ignite.IgnitionManager;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.configuration.ConfigurationDynamicDefaultsPatcher;
import org.apache.ignite.configuration.ConfigurationModule;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
@@ -69,13 +72,13 @@ import
org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModules;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
-import
org.apache.ignite.internal.configuration.DistributedConfigurationUpdater;
import org.apache.ignite.internal.configuration.JdbcPortProviderImpl;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
-import org.apache.ignite.internal.configuration.presentation.HoconPresentation;
+import org.apache.ignite.internal.configuration.hocon.HoconConverter;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import
org.apache.ignite.internal.configuration.storage.LocalFileConfigurationStorage;
+import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidator;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
import org.apache.ignite.internal.deployunit.DeploymentManagerImpl;
@@ -232,15 +235,9 @@ public class IgniteImpl implements Ignite {
/** Placement driver manager. */
private final PlacementDriverManager placementDriverMgr;
- /** Distributed configuration validator. */
- private final ConfigurationValidator distributedConfigurationValidator;
-
/** Configuration manager that handles cluster (distributed)
configuration. */
private final ConfigurationManager clusterCfgMgr;
- /** Cluster configuration defaults setter. */
- private final ConfigurationDynamicDefaultsPatcherImpl
clusterConfigurationDefaultsSetter;
-
/** Cluster initializer. */
private final ClusterInitializer clusterInitializer;
@@ -301,8 +298,6 @@ public class IgniteImpl implements Ignite {
private final RestAddressReporter restAddressReporter;
- private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
-
private final CatalogManager catalogManager;
private final AuthenticationManager authenticationManager;
@@ -407,8 +402,21 @@ public class IgniteImpl implements Ignite {
modules.distributed().polymorphicSchemaExtensions()
);
- distributedConfigurationValidator =
-
ConfigurationValidatorImpl.withDefaultValidators(distributedConfigurationGenerator,
modules.distributed().validators());
+ ConfigurationValidator distributedCfgValidator =
ConfigurationValidatorImpl.withDefaultValidators(
+ distributedConfigurationGenerator,
+ modules.distributed().validators()
+ );
+
+ ConfigurationDynamicDefaultsPatcher clusterCfgDynamicDefaultsPatcher =
new ConfigurationDynamicDefaultsPatcherImpl(
+ modules.distributed(),
+ distributedConfigurationGenerator
+ );
+
+ clusterInitializer = new ClusterInitializer(
+ clusterSvc,
+ clusterCfgDynamicDefaultsPatcher,
+ distributedCfgValidator
+ );
NodeAttributesCollector nodeAttributesCollector =
new NodeAttributesCollector(
@@ -416,16 +424,6 @@ public class IgniteImpl implements Ignite {
nodeConfigRegistry.getConfiguration(StorageProfilesConfiguration.KEY)
);
-
- clusterConfigurationDefaultsSetter =
- new
ConfigurationDynamicDefaultsPatcherImpl(modules.distributed(),
distributedConfigurationGenerator);
-
- clusterInitializer = new ClusterInitializer(
- clusterSvc,
- clusterConfigurationDefaultsSetter,
- distributedConfigurationValidator
- );
-
cmgMgr = new ClusterManagementGroupManager(
vaultMgr,
clusterSvc,
@@ -463,16 +461,11 @@ public class IgniteImpl implements Ignite {
modules.distributed().rootKeys(),
cfgStorage,
distributedConfigurationGenerator,
- distributedConfigurationValidator
+ distributedCfgValidator
);
ConfigurationRegistry clusterConfigRegistry =
clusterCfgMgr.configurationRegistry();
- distributedConfigurationUpdater = new DistributedConfigurationUpdater(
- cmgMgr,
- new HoconPresentation(clusterCfgMgr.configurationRegistry())
- );
-
metaStorageMgr.configure(clusterConfigRegistry.getConfiguration(MetaStorageConfiguration.KEY));
Consumer<LongFunction<CompletableFuture<?>>> registry = c ->
metaStorageMgr.registerRevisionUpdateListener(c::apply);
@@ -803,6 +796,14 @@ public class IgniteImpl implements Ignite {
return metaStorageMgr.recoveryFinishedFuture();
}, startupExecutor)
+ .thenComposeAsync(revision -> {
+ // If the revision is greater than 0, then the
configuration has already been initialized.
+ if (revision > 0) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
initializeClusterConfiguration(startupExecutor);
+ }
+ }, startupExecutor)
.thenRunAsync(() -> {
LOG.info("MetaStorage started, starting the remaining
components");
@@ -843,13 +844,6 @@ public class IgniteImpl implements Ignite {
return recoverComponentsStateOnStart(startupExecutor);
}, startupExecutor)
.thenComposeAsync(v ->
clusterCfgMgr.configurationRegistry().onDefaultsPersisted(), startupExecutor)
- .thenRunAsync(() -> {
- try {
-
lifecycleManager.startComponent(distributedConfigurationUpdater);
- } catch (NodeStoppingException e) {
- throw new CompletionException(e);
- }
- }, startupExecutor)
// Signal that local recovery is complete and the node is
ready to join the cluster.
.thenComposeAsync(v -> {
LOG.info("Recovery complete, finishing join");
@@ -1049,8 +1043,23 @@ public class IgniteImpl implements Ignite {
}
/**
- * Recovers components state on start by invoking configuration listeners
({@link #notifyConfigurationListeners()}
- * and deploying watches after that.
+ * Initializes the cluster configuration with the specified user-provided
configuration upon cluster initialization.
+ */
+ private CompletableFuture<Void>
initializeClusterConfiguration(ExecutorService startupExecutor) {
+ return cmgMgr.initialClusterConfigurationFuture().thenAcceptAsync(cfg
-> {
+ if (cfg == null) {
+ return;
+ }
+
+ Config config = ConfigFactory.parseString(cfg);
+ ConfigurationSource hoconSource =
HoconConverter.hoconSource(config.root());
+
clusterCfgMgr.configurationRegistry().initializeConfigurationWith(hoconSource);
+ }, startupExecutor);
+ }
+
+ /**
+ * Recovers components state on start by invoking configuration listeners
({@link #notifyConfigurationListeners()} and deploying watches
+ * after that.
*/
private CompletableFuture<?> recoverComponentsStateOnStart(ExecutorService
startupExecutor) {
CompletableFuture<Void> startupConfigurationUpdate =
notifyConfigurationListeners();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
deleted file mode 100644
index 6a6447fab5..0000000000
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.configuration;
-
-import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.manager.IgniteComponent;
-
-/**
- * Updater is responsible for applying changes to the cluster configuration
when it's ready.
- */
-public class DistributedConfigurationUpdater implements IgniteComponent {
-
- private static final IgniteLogger LOG =
Loggers.forClass(DistributedConfigurationUpdater.class);
-
- private final ClusterManagementGroupManager cmgMgr;
-
- private final ConfigurationPresentation<String> presentation;
-
- public DistributedConfigurationUpdater(ClusterManagementGroupManager
cmgMgr, ConfigurationPresentation<String> presentation) {
- this.cmgMgr = cmgMgr;
- this.presentation = presentation;
- }
-
- @Override
- public void start() {
- cmgMgr.clusterConfigurationToUpdate()
- .thenAccept(action -> {
- if (action.configuration() != null) {
- presentation.update(action.configuration())
- .thenApply(ignored -> action)
- .thenCompose(it -> it.nextAction().get())
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Failed to update the
distributed configuration", e);
- } else {
- LOG.info("Distributed configuration is
updated");
- }
- });
- }
- });
- }
-
- @Override
- public void stop() throws Exception {
- }
-}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
deleted file mode 100644
index 61a480fd31..0000000000
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.configuration;
-
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
-import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.concurrent.CompletableFuture;
-import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import
org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
-import
org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
-import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-class DistributedConfigurationUpdaterTest extends BaseIgniteAbstractTest {
-
- @Mock
- public ConfigurationPresentation<String> presentation;
-
- @Mock
- public ClusterManagementGroupManager cmgMgr;
-
- @Test
- public void nextActionIsCompletedAfterUpdatingConfiguration() {
-
- // Set up mocks.
-
when(presentation.update(anyString())).thenReturn(completedFuture(null));
-
- CompletableFuture<Void> nextAction = new CompletableFuture<>();
- String configuration = "security.enabled:true";
- UpdateDistributedConfigurationAction
updateDistributedConfigurationAction =
- new UpdateDistributedConfigurationAction(
- configuration,
- () -> {
- nextAction.complete(null);
- return nextAction;
- }
- );
-
- when(cmgMgr.clusterConfigurationToUpdate())
-
.thenReturn(completedFuture(updateDistributedConfigurationAction));
-
- // Run updater.
- DistributedConfigurationUpdater distributedConfigurationUpdater = new
DistributedConfigurationUpdater(
- cmgMgr,
- presentation
- );
-
- distributedConfigurationUpdater.start();
-
- // Verify that configuration was updated.
- verify(presentation, times(1)).update(configuration);
-
- // Verify that next action is completed.
- assertThat(nextAction, willCompleteSuccessfully());
- }
-
- @Test
- public void nextActionIsCompletedIfConfigurationNull() {
-
- // Set up mocks.
- CompletableFuture<Void> nextAction = new CompletableFuture<>();
- UpdateDistributedConfigurationAction
updateDistributedConfigurationAction =
- new UpdateDistributedConfigurationAction(
- null,
- () -> {
- nextAction.complete(null);
- return nextAction;
- }
- );
-
- when(cmgMgr.clusterConfigurationToUpdate())
-
.thenReturn(completedFuture(updateDistributedConfigurationAction));
-
- // Run updater.
- DistributedConfigurationUpdater distributedConfigurationUpdater = new
DistributedConfigurationUpdater(
- cmgMgr,
- presentation
- );
-
- distributedConfigurationUpdater.start();
-
- // Verify that configuration wasn't updated.
- verify(presentation, never()).update(any());
-
- // Verify that next action is not completed.
- assertThat(nextAction, willTimeoutFast());
- }
-}