This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new dd86f44f496 IGNITE-23518 Add configuration for distributed properties
default values - Fixes #11616.
dd86f44f496 is described below
commit dd86f44f496bb8135d0ab654680127bfca1b606c
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon Oct 28 17:55:55 2024 +0300
IGNITE-23518 Add configuration for distributed properties default values -
Fixes #11616.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../ignite/configuration/IgniteConfiguration.java | 25 ++
.../cluster/DistributedConfigurationUtils.java | 43 ++-
.../DistributedConfigurationProcessor.java | 56 +++-
.../DistributedConfigurationDefaultValuesTest.java | 288 +++++++++++++++++++++
.../ignite/testsuites/IgnitePdsTestSuite.java | 2 +
5 files changed, 394 insertions(+), 20 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index abaa32f90f5..8f01002377e 100644
---
a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -605,6 +605,9 @@ public class IgniteConfiguration {
/** Shutdown policy for cluster. */
public ShutdownPolicy shutdown = DFLT_SHUTDOWN_POLICY;
+ /** Default values for distributed properties. */
+ private Map<String, String> distrProps;
+
/**
* Creates valid grid configuration with all default values.
*/
@@ -732,6 +735,7 @@ public class IgniteConfiguration {
sqlCfg = cfg.getSqlConfiguration();
shutdown = cfg.getShutdownPolicy();
asyncContinuationExecutor = cfg.getAsyncContinuationExecutor();
+ distrProps = cfg.getDistributedPropertiesDefaultValues();
}
/**
@@ -3599,6 +3603,27 @@ public class IgniteConfiguration {
return this;
}
+ /**
+ * Gets default values for distributed properties.
+ *
+ * @return Default values for distributed properties.
+ */
+ public Map<String, String> getDistributedPropertiesDefaultValues() {
+ return distrProps;
+ }
+
+ /**
+ * Sets default values for distributed properties.
+ *
+ * @param distrProps Default values for distributed properties.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration
setDistributedPropertiesDefaultValues(Map<String, String> distrProps) {
+ this.distrProps = distrProps;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
index 1041aed386e..eb12788dfaf 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
@@ -21,8 +21,10 @@ import java.io.Serializable;
import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedProperty;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.jetbrains.annotations.NotNull;
import static java.lang.String.format;
@@ -38,24 +40,45 @@ public final class DistributedConfigurationUtils {
}
/**
- * @param property Property which value should be set.
- * @param value Default value.
+ * @param prop Property which value should be set.
+ * @param val Default value.
* @param log Logger.
* @param <T> Property type.
+ *
+ * @return Future for the operation.
*/
- public static <T extends Serializable> void
setDefaultValue(DistributedProperty<T> property, T value, IgniteLogger log) {
- if (property.get() == null) {
+ public static <T extends Serializable> IgniteInternalFuture<Void>
setDefaultValue(
+ DistributedProperty<T> prop,
+ T val,
+ IgniteLogger log
+ ) {
+ if (prop.get() == null) {
try {
- property.propagateAsync(null, value)
- .listen(future -> {
- if (future.error() != null)
- log.error("Cannot set default value of '" +
property.getName() + '\'', future.error());
- });
+ IgniteInternalFuture<Void> fut =
(IgniteInternalFuture<Void>)prop.propagateAsync(null, val);
+
+ fut.listen(future -> {
+ if (future.error() != null)
+ log.error("Cannot set default value of '" +
prop.getName() + '\'', future.error());
+ });
+
+ return fut;
}
catch (IgniteCheckedException e) {
- log.error("Cannot initiate setting default value of '" +
property.getName() + '\'', e);
+ String errMsg = "Cannot initiate setting default value of '" +
prop.getName() + '\'';
+
+ log.error(errMsg, e);
+
+ return new GridFinishedFuture<>(new
IgniteCheckedException(errMsg, e));
}
}
+ else {
+ if (log.isDebugEnabled()) {
+ log.debug("Skip set default value for distributed property
[name=" + prop.getName() +
+ ", clusterValue=" + prop.get() + ", defaultValue=" + val +
']');
+ }
+
+ return new GridFinishedFuture<>();
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
index 5c6f91332d6..bc01987509a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java
@@ -22,15 +22,20 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.DistributedConfigurationUtils;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import
org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import
org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import
org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
@@ -47,7 +52,7 @@ public class DistributedConfigurationProcessor extends
GridProcessorAdapter impl
private static final String DIST_CONF_PREFIX = "distrConf-";
/** Properties storage. */
- private final Map<String, DistributedChangeableProperty> props = new
ConcurrentHashMap<>();
+ private final Map<String, DistributedChangeableProperty<Serializable>>
props = new ConcurrentHashMap<>();
/** Global metastorage. */
private volatile DistributedMetaStorage distributedMetastorage;
@@ -74,7 +79,7 @@ public class DistributedConfigurationProcessor extends
GridProcessorAdapter impl
distributedMetastorage.listen(
(key) -> key.startsWith(DIST_CONF_PREFIX),
(String key, Serializable oldVal, Serializable newVal) -> {
- DistributedChangeableProperty prop =
props.get(toPropertyKey(key));
+ DistributedChangeableProperty<Serializable> prop =
props.get(toPropertyKey(key));
if (prop != null)
prop.localUpdate(newVal);
@@ -94,12 +99,46 @@ public class DistributedConfigurationProcessor extends
GridProcessorAdapter impl
//Switch to cluster wide update action and do it on already
registered properties.
switchCurrentActionTo(CLUSTER_WIDE_UPDATE);
- isp.getDistributedConfigurationListeners()
-
.forEach(DistributedConfigurationLifecycleListener::onReadyToWrite);
+ IgniteInternalFuture<Void> initFut =
initDefaultPropertiesValues();
+
+ // Notify registered listeners only after propagation of
default values.
+ // Can't wait for initFut in the current thread, since it can
block discovery and deadlock is possible.
+ initFut.listen(fut ->
isp.getDistributedConfigurationListeners()
+
.forEach(DistributedConfigurationLifecycleListener::onReadyToWrite));
}
});
}
+ /** Init default values for distributed properties. */
+ private IgniteInternalFuture<Void> initDefaultPropertiesValues() {
+ Map<String, String> dfltVals =
ctx.config().getDistributedPropertiesDefaultValues();
+
+ if (F.isEmpty(dfltVals))
+ return new GridFinishedFuture<>();
+
+ GridCompoundFuture<Void, Void> compFut = new GridCompoundFuture<>() {
+ @Override protected boolean ignoreFailure(Throwable err) {
+ // Do not complete the entire compound future if any property
failed.
+ return true;
+ }
+ };
+
+ for (Map.Entry<String, String> entry : dfltVals.entrySet()) {
+ DistributedChangeableProperty<Serializable> prop =
props.get(entry.getKey());
+
+ if (prop == null) {
+ log.warning("Cannot set default value for distributed property
'" + entry.getKey() +
+ "', property is not registered");
+
+ continue;
+ }
+
+ compFut.add(DistributedConfigurationUtils.setDefaultValue(prop,
prop.parse(entry.getValue()), log));
+ }
+
+ return compFut.markInitialized();
+ }
+
/**
* Switching current action to given action and do all actions from old
action to new one.
*
@@ -160,15 +199,12 @@ public class DistributedConfigurationProcessor extends
GridProcessorAdapter impl
* @return Public properties.
*/
public List<DistributedChangeableProperty<Serializable>> properties() {
- return props.values().stream()
- .filter(p -> p instanceof DistributedChangeableProperty)
- .map(p -> (DistributedChangeableProperty<Serializable>)p)
- .collect(Collectors.toList());
+ return U.sealList(props.values());
}
/** {@inheritDoc} */
@Override public @Nullable <T extends Serializable>
DistributedChangeableProperty<T> property(String name) {
- DistributedChangeableProperty<T> p = props.get(name);
+ DistributedChangeableProperty<T> p =
(DistributedChangeableProperty<T>)props.get(name);
if (!(p instanceof DistributedChangeableProperty))
return null;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationDefaultValuesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationDefaultValuesTest.java
new file mode 100644
index 00000000000..81ecc62fd72
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationDefaultValuesTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.configuration.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Test default values configuration for distributed properties.
+ */
+public class DistributedConfigurationDefaultValuesTest extends
GridCommonAbstractTest {
+ /** */
+ private Consumer<DistributedPropertyDispatcher> onReadyToRegister;
+
+ /** */
+ private Runnable onReadyToWrite;
+
+ /** */
+ Map<String, String> dfltPropVals;
+
+ /** */
+ private boolean pds;
+
+ /** */
+ private final ListeningTestLogger listeningLog = new
ListeningTestLogger(log);
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setGridLogger(listeningLog);
+
+ cfg.setConsistentId(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setPersistenceEnabled(pds)
+ ));
+
+ cfg.setDistributedPropertiesDefaultValues(dfltPropVals);
+
+ cfg.setPluginProviders(new AbstractTestPluginProvider() {
+ @Override public String name() {
+ return "Distributed property register";
+ }
+
+ @Override public void start(PluginContext ctx) {
+
((IgniteEx)ctx.grid()).context().internalSubscriptionProcessor().getDistributedConfigurationListeners().add(
+ new DistributedConfigurationLifecycleListener() {
+ @Override public void
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+ if (onReadyToRegister != null)
+ onReadyToRegister.accept(dispatcher);
+ }
+
+ @Override public void onReadyToWrite() {
+ if (onReadyToWrite != null)
+ onReadyToWrite.run();
+ }
+ }
+ );
+ }
+ });
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testDifferentPropertyTypes() throws Exception {
+ DistributedLongProperty longProp =
DistributedLongProperty.detachedLongProperty("longProp", "");
+ DistributedBooleanProperty boolProp =
DistributedBooleanProperty.detachedBooleanProperty("boolProp", "");
+ SimpleDistributedProperty<String> strProp = new
SimpleDistributedProperty<>("stringProp", Function.identity(), "");
+ DistributedEnumProperty<ClusterState> enumProp = new
DistributedEnumProperty<>(
+ "enumProp", "",
+ ordinal -> ordinal == null ? null :
ClusterState.fromOrdinal(ordinal),
+ state -> state == null ? null : state.ordinal(),
+ ClusterState.class
+ );
+
+ dfltPropVals = Map.of(
+ "longProp", "1",
+ "boolProp", "true",
+ "stringProp", "val",
+ "enumProp", "ACTIVE"
+ );
+
+ onReadyToRegister = dispatcher ->
dispatcher.registerProperties(longProp, boolProp, strProp, enumProp);
+
+ Map<String, String> lsnrProps = new HashMap<>();
+
+ DistributePropertyListener<Object> lsnr = (name, oldVal, newVal) -> {
+ if (newVal != null)
+ lsnrProps.put(name, newVal.toString());
+ };
+
+ longProp.addListener(lsnr);
+ boolProp.addListener(lsnr);
+ strProp.addListener(lsnr);
+ enumProp.addListener(lsnr);
+
+ // Properties values when distributed configuration is ready to write.
+ Map<String, String> lsnrProps0 = new HashMap<>();
+
+ onReadyToWrite = () -> lsnrProps0.putAll(lsnrProps);
+
+ startGrid(0);
+
+ assertEquals(dfltPropVals, lsnrProps0);
+
+ assertEquals((Long)1L, longProp.get());
+ assertEquals(Boolean.TRUE, boolProp.get());
+ assertEquals("val", strProp.get());
+ assertEquals(ClusterState.ACTIVE, enumProp.get());
+ }
+
+ /** */
+ @Test
+ public void testInMemoryCluster() throws Exception {
+ checkDistributedPropertiesClusterPropagation(false);
+ }
+
+ /** */
+ @Test
+ public void testPersistentCluster() throws Exception {
+ checkDistributedPropertiesClusterPropagation(true);
+ }
+
+ /** */
+ private void checkDistributedPropertiesClusterPropagation(boolean pds)
throws Exception {
+ this.pds = pds;
+
+ String propName = "testProp";
+
+ // Properties with the same name, but on different nodes.
+ DistributedLongProperty prop0 =
DistributedLongProperty.detachedLongProperty(propName, "");
+ DistributedLongProperty prop1 =
DistributedLongProperty.detachedLongProperty(propName, "");
+ DistributedLongProperty prop2 =
DistributedLongProperty.detachedLongProperty(propName, "");
+
+ dfltPropVals = F.asMap(propName, "1");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop0);
+
+ startGrid(0).cluster().state(ClusterState.ACTIVE);
+
+ assertEquals("Expecting value from default configuration", (Long)1L,
prop0.get());
+
+ dfltPropVals = null;
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop1);
+
+ startGrid(1);
+
+ assertEquals("Expecting value from cluster, when value from cluster is
not empty " +
+ "and default configuration is empty", (Long)1L, prop1.get());
+
+ dfltPropVals = F.asMap(propName, "2");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop2);
+
+ startGrid(2);
+
+ assertEquals("Expecting value from cluster, when value from cluster
and default configuration " +
+ "is not empty", (Long)1L, prop2.get());
+
+ if (!pds) // Further checks for PDS after cluster restart.
+ return;
+
+ stopAllGrids();
+
+ // Properties for all nodes after restart.
+ DistributedLongProperty prop3 =
DistributedLongProperty.detachedLongProperty(propName, "");
+ DistributedLongProperty prop4 =
DistributedLongProperty.detachedLongProperty(propName, "");
+ DistributedLongProperty prop5 =
DistributedLongProperty.detachedLongProperty(propName, "");
+
+ dfltPropVals = F.asMap(propName, "2");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop3);
+
+ startGrid(0);
+
+ assertEquals("Expecting value from PDS, when default configuration is
not empty and PDS is not empty", (Long)1L, prop3.get());
+
+ prop3.propagateAsync(2L).get();
+
+ dfltPropVals = null;
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop4);
+
+ startGrid(1);
+
+ assertEquals("Expecting value from cluster, when value from cluster is
not empty, default configuration " +
+ "is empty and PDS is not empty", (Long)2L, prop4.get());
+
+ dfltPropVals = F.asMap(propName, "3");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop5);
+
+ startGrid(2);
+
+ assertEquals("Expecting value from cluster, when value from cluster is
not empty, default configuration " +
+ "is not empty and PDS is not empty", (Long)2L, prop5.get());
+ }
+
+ /** */
+ @Test
+ public void testPropertyInitBySecondNode() throws Exception {
+ String propName = "testProp";
+
+ DistributedLongProperty prop0 =
DistributedLongProperty.detachedLongProperty(propName, "");
+ DistributedLongProperty prop1 =
DistributedLongProperty.detachedLongProperty(propName, "");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop0);
+
+ startGrid(0);
+
+ assertNull(prop0.get());
+
+ dfltPropVals = F.asMap(propName, "1");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop1);
+
+ startGrid(1);
+
+ assertEquals((Long)1L, prop1.get());
+ assertEquals((Long)1L, prop0.get());
+ }
+
+ /** */
+ @Test
+ public void testNotRegisteredProperty() throws Exception {
+ String propName = "testProp";
+ String notExistingPropName = "notExistingProp";
+
+ DistributedLongProperty prop =
DistributedLongProperty.detachedLongProperty(propName, "");
+
+ dfltPropVals = F.asMap(propName, "1", notExistingPropName, "2");
+
+ onReadyToRegister = dispatcher -> dispatcher.registerProperties(prop);
+
+ String warn = "Cannot set default value for distributed property
'notExistingProp', property is not registered";
+
+ LogListener logLsnr = LogListener.matches(warn).build();
+
+ listeningLog.registerListener(logLsnr);
+
+ startGrid(0);
+
+ assertEquals((Long)1L, prop.get());
+ assertTrue(logLsnr.check());
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index bda9ad78dc0..95b29c84e93 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -44,6 +44,7 @@ import
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePds
import
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsCheckpointSimulationWithRealCpDisabledTest;
import
org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsPageReplacementTest;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.IgniteMetaStorageBasicTest;
+import
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationDefaultValuesTest;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationPersistentTest;
import
org.apache.ignite.internal.processors.database.IgniteDbDynamicCacheSelfTest;
import
org.apache.ignite.internal.processors.database.IgniteDbMultiNodePutGetTest;
@@ -139,6 +140,7 @@ public class IgnitePdsTestSuite {
GridTestUtils.addTestIfNeeded(suite, IgniteMetaStorageBasicTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
DistributedMetaStoragePersistentTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
DistributedConfigurationPersistentTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
DistributedConfigurationDefaultValuesTest.class, ignoredTests);
//Diagnostic
GridTestUtils.addTestIfNeeded(suite, DiagnosticProcessorTest.class,
ignoredTests);