This is an automated email from the ASF dual-hosted git repository. dgovorukhin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 72fdb15 IGNITE-8573 Save baseline auto-adjust parameters to metastore - Fixes #5806. 72fdb15 is described below commit 72fdb1517fe11dfc1be82076f5544077bf9ec829 Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Fri Jan 18 11:20:01 2019 +0300 IGNITE-8573 Save baseline auto-adjust parameters to metastore - Fixes #5806. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> --- .../main/java/org/apache/ignite/IgniteCluster.java | 8 + .../ignite/configuration/IgniteConfiguration.java | 87 ++++++- .../apache/ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../org/apache/ignite/internal/IgniteKernal.java | 2 + .../cluster/DistributedBaselineConfiguration.java | 109 ++++++++ .../internal/cluster/IgniteClusterAsyncImpl.java | 5 + .../ignite/internal/cluster/IgniteClusterImpl.java | 50 ++-- .../wal/reader/StandaloneGridKernalContext.java | 6 + .../distributed/DetachedPropertyException.java | 34 +++ .../distributed/DistributedBooleanProperty.java | 41 +++ .../distributed/DistributedComparableProperty.java | 63 +++++ .../DistributedConfigurationLifecycleListener.java | 29 +++ .../DistributedConfigurationProcessor.java | 281 +++++++++++++++++++++ .../distributed/DistributedLongProperty.java | 41 +++ .../distributed/DistributedProperty.java | 118 +++++++++ .../distributed/DistributedPropertyDispatcher.java | 53 ++++ .../ReadOnlyDistributedMetaStorageBridge.java | 2 +- .../platform/utils/PlatformConfigurationUtils.java | 12 + .../GridInternalSubscriptionProcessor.java | 32 ++- .../util/lang/IgniteThrowableBiConsumer.java | 38 +++ .../org.apache.ignite.plugin.PluginProvider | 1 + .../distributed/DistributedConfigurationTest.java | 242 ++++++++++++++++++ .../TestDistibutedConfigurationPlugin.java | 119 +++++++++ .../junits/multijvm/IgniteClusterProcessProxy.java | 6 + .../ApiParity/ClusterParityTest.cs | 3 +- .../IgniteConfigurationTest.cs | 4 + .../Apache.Ignite.Core/IgniteConfiguration.cs | 60 +++++ .../IgniteConfigurationSection.xsd | 15 ++ 29 files changed, 1443 insertions(+), 38 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java index fc0e81b..5d6b9f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java @@ -26,6 +26,7 @@ import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterStartNodeResult; +import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteFuture; @@ -530,4 +531,11 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { * @see #enableWal(String) */ public boolean isWalEnabled(String cacheName); + + /** + * All distributed properties of baseline. + * + * @return Distributed baseline configuration. + */ + public DistributedBaselineConfiguration baselineConfiguration(); } diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 9498c5b..00178e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -17,11 +17,6 @@ package org.apache.ignite.configuration; -import java.io.Serializable; -import java.lang.management.ManagementFactory; -import java.util.Map; -import java.util.UUID; -import java.util.zip.Deflater; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryListener; import javax.cache.expiry.ExpiryPolicy; @@ -29,6 +24,11 @@ import javax.cache.integration.CacheLoader; import javax.cache.processor.EntryProcessor; import javax.management.MBeanServer; import javax.net.ssl.SSLContext; +import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.util.Map; +import java.util.UUID; +import java.util.zip.Deflater; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -225,6 +225,18 @@ public class IgniteConfiguration { /** Default time interval between MVCC vacuum runs in milliseconds. */ public static final long DFLT_MVCC_VACUUM_FREQUENCY = 5000; + /** Default of initial value of manual baseline control or auto adjusting baseline. */ + public static final boolean DFLT_INIT_BASELINE_AUTO_ADJUST_ENABLED = false; + + /** + * Initial value of time which we would wait before the actual topology change since last discovery event(node + * join/exit). + */ + public static final long DFLT_INIT_BASELINE_AUTO_ADJUST_TIMEOUT = 0; + + /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */ + public static final long DFLT_INIT_BASELINE_AUTO_ADJUST_MAX_TIMEOUT = 0; + /** Optional local Ignite instance name. */ private String igniteInstanceName; @@ -524,6 +536,18 @@ public class IgniteConfiguration { /** SQL schemas to be created on node start. */ private String[] sqlSchemas; + /** Initial value of manual baseline control or auto adjusting baseline. */ + private boolean initBaselineAutoAdjustEnabled = DFLT_INIT_BASELINE_AUTO_ADJUST_ENABLED; + + /** + * Initial value of time which we would wait before the actual topology change since last discovery event(node + * join/exit). + */ + private long initBaselineAutoAdjustTimeout = DFLT_INIT_BASELINE_AUTO_ADJUST_TIMEOUT; + + /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */ + private long initBaselineAutoAdjustMaxTimeout = DFLT_INIT_BASELINE_AUTO_ADJUST_MAX_TIMEOUT; + /** * Creates valid grid configuration with all default values. */ @@ -3167,6 +3191,59 @@ public class IgniteConfiguration { return this; } + /** + * Gets initial value of manual baseline control or auto adjusting baseline. This value would be used only if it + * have not been changed earlier in real time. + * + * @return {@code true} if auto adjusting baseline enabled. + */ + public boolean isInitBaselineAutoAdjustEnabled() { + return initBaselineAutoAdjustEnabled; + } + + /** + * Sets initial value of manual baseline control or auto adjusting baseline. + */ + public void setInitBaselineAutoAdjustEnabled(boolean initBaselineAutoAdjustEnabled) { + this.initBaselineAutoAdjustEnabled = initBaselineAutoAdjustEnabled; + } + + /** + * Gets initial value of time which we would wait before the actual topology change. But it would be reset if new + * discovery event happened. (node join/exit). This value would be used only if it have not been changed earlier in + * real time. + * + * @return Timeout of wait the actual topology change. + */ + public long getInitBaselineAutoAdjustTimeout() { + return initBaselineAutoAdjustTimeout; + } + + /** + * Sets initial value of time which we would wait before the actual topology change. + */ + public void setInitBaselineAutoAdjustTimeout(long initBaselineAutoAdjustTimeout) { + this.initBaselineAutoAdjustTimeout = initBaselineAutoAdjustTimeout; + } + + /** + * Gets initial value of time which we would wait from the first discovery event in the chain. If we achieved it + * than we would change BLAT right away (no matter were another node join/exit happened or not). This value would be + * used only if it have not been changed earlier in real time. + * + * @return Timeout of wait the actual topology change. + */ + public long getInitBaselineAutoAdjustMaxTimeout() { + return initBaselineAutoAdjustMaxTimeout; + } + + /** + * Sets initial value of time which we would wait from the first discovery event in the chain. + */ + public void setInitBaselineAutoAdjustMaxTimeout(long initBaselineAutoAdjustMaxTimeout) { + this.initBaselineAutoAdjustMaxTimeout = initBaselineAutoAdjustMaxTimeout; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 9651290..744f858 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor; import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; @@ -214,6 +215,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public DistributedMetaStorage distributedMetastorage(); /** + * Gets distributed configuration processor. + * + * @return Distributed configuration processor. + */ + public DistributedConfigurationProcessor distributedConfiguration(); + + /** * Gets task session processor. * * @return Session processor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index cc18d49..85e02f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor; import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; @@ -223,6 +224,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringInclude private DistributedMetaStorage distributedMetastorage; + /** Global metastorage. */ + @GridToStringInclude + private DistributedConfigurationProcessor distributedConfigurationProcessor; + /** */ @GridToStringInclude private GridTaskSessionProcessor sesProc; @@ -609,6 +614,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable stateProc = (GridClusterStateProcessor)comp; else if (comp instanceof DistributedMetaStorage) distributedMetastorage = (DistributedMetaStorage)comp; + else if (comp instanceof DistributedConfigurationProcessor) + distributedConfigurationProcessor = (DistributedConfigurationProcessor)comp; else if (comp instanceof GridTaskSessionProcessor) sesProc = (GridTaskSessionProcessor)comp; else if (comp instanceof GridPortProcessor) @@ -764,6 +771,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public DistributedConfigurationProcessor distributedConfiguration() { + return distributedConfigurationProcessor; + } + + /** {@inheritDoc} */ @Override public GridTaskSessionProcessor session() { return sesProc; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 30af4f4..e427810 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -149,6 +149,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor; @@ -1037,6 +1038,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(createComponent(PlatformProcessor.class, ctx)); startProcessor(new GridMarshallerMappingProcessor(ctx)); startProcessor(new DistributedMetaStorageImpl(ctx)); + startProcessor(new DistributedConfigurationProcessor(ctx)); // Start plugins. for (PluginProvider provider : ctx.plugins().allProviders()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java new file mode 100644 index 0000000..99d8929 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedBaselineConfiguration.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty; +import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; + +import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedProperty; +import static org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty.detachedProperty; + +/** + * Distributed baseline configuration. + */ +public class DistributedBaselineConfiguration { + /** Value of manual baseline control or auto adjusting baseline. */ + private DistributedBooleanProperty baselineAutoAdjustEnabled; + + /** + * Value of time which we would wait before the actual topology change since last discovery event(node join/exit). + */ + private DistributedLongProperty baselineAutoAdjustTimeout; + + /** Value of time which we would wait from the first discovery event in the chain(node join/exit). */ + private DistributedLongProperty baselineAutoAdjustMaxTimeout; + + /** + * @param cfg Static config. + * @param isp Subscription processor. + */ + public DistributedBaselineConfiguration(IgniteConfiguration cfg, GridInternalSubscriptionProcessor isp) { + baselineAutoAdjustEnabled = detachedProperty("baselineAutoAdjustEnabled", cfg.isInitBaselineAutoAdjustEnabled()); + baselineAutoAdjustTimeout = detachedProperty("baselineAutoAdjustTimeout", cfg.getInitBaselineAutoAdjustTimeout()); + baselineAutoAdjustMaxTimeout = detachedProperty("baselineAutoAdjustMaxTimeout", cfg.getInitBaselineAutoAdjustMaxTimeout()); + + isp.registerDistributedConfigurationListener( + dispatcher -> { + dispatcher.registerProperty(baselineAutoAdjustEnabled); + dispatcher.registerProperty(baselineAutoAdjustTimeout); + dispatcher.registerProperty(baselineAutoAdjustMaxTimeout); + } + ); + } + + /** + * @return Value of manual baseline control or auto adjusting baseline. + */ + public boolean isBaselineAutoAdjustEnabled() { + return baselineAutoAdjustEnabled.value(); + } + + /** + * @param baselineAutoAdjustEnabled Value of manual baseline control or auto adjusting baseline. + * @throws IgniteCheckedException if failed. + */ + public void setBaselineAutoAdjustEnabled(boolean baselineAutoAdjustEnabled) throws IgniteCheckedException { + this.baselineAutoAdjustEnabled.propagate(baselineAutoAdjustEnabled); + } + + /** + * @return Value of time which we would wait before the actual topology change since last discovery event(node + * join/exit). + */ + public long getBaselineAutoAdjustTimeout() { + return baselineAutoAdjustTimeout.value(); + } + + /** + * @param baselineAutoAdjustTimeout Value of time which we would wait before the actual topology change since last + * discovery event(node join/exit). + * @throws IgniteCheckedException If failed. + */ + public void setBaselineAutoAdjustTimeout(long baselineAutoAdjustTimeout) throws IgniteCheckedException { + this.baselineAutoAdjustTimeout.propagate(baselineAutoAdjustTimeout); + } + + /** + * @return Value of time which we would wait from the first discovery event in the chain(node join/exit). + */ + public long getBaselineAutoAdjustMaxTimeout() { + return baselineAutoAdjustMaxTimeout.value(); + } + + /** + * @param baselineAutoAdjustMaxTimeout Value of time which we would wait from the first discovery event in the + * chain(node join/exit). + * @throws IgniteCheckedException If failed. + */ + public void setBaselineAutoAdjustMaxTimeout(long baselineAutoAdjustMaxTimeout) throws IgniteCheckedException { + this.baselineAutoAdjustMaxTimeout.propagate(baselineAutoAdjustMaxTimeout); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index d79710d..60eec0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -361,6 +361,11 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ + @Override public DistributedBaselineConfiguration baselineConfiguration() { + return cluster.baselineConfiguration(); + } + + /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { cluster = (IgniteClusterImpl)in.readObject(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index b755258..2f5e63a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -93,6 +93,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus /** Minimal IgniteProductVersion supporting BaselineTopology */ private static final IgniteProductVersion MIN_BLT_SUPPORTING_VER = IgniteProductVersion.fromString("2.4.0"); + /** Distributed baseline configuration. */ + private DistributedBaselineConfiguration distributedBaselineConfiguration; + /** * Required by {@link Externalizable}. */ @@ -109,6 +112,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus cfg = ctx.config(); nodeLoc = new ClusterNodeLocalMapImpl(ctx); + + distributedBaselineConfiguration = new DistributedBaselineConfiguration(cfg, ctx.internalSubscriptionProcessor()); } /** {@inheritDoc} */ @@ -191,8 +196,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus boolean restart, int timeout, int maxConn) - throws IgniteException - { + throws IgniteException { try { return startNodesAsync0(file, restart, timeout, maxConn).get(); } @@ -213,8 +217,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus boolean restart, int timeout, int maxConn) - throws IgniteException - { + throws IgniteException { try { return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get(); } @@ -371,8 +374,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** - * Verifies all nodes in current cluster topology support BaselineTopology feature - * so compatibilityMode flag is enabled to reset. + * Verifies all nodes in current cluster topology support BaselineTopology feature so compatibilityMode flag is + * enabled to reset. * * @param discoCache */ @@ -419,7 +422,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** */ - @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval(Collection<? extends BaselineNode> newBlt) { + @Nullable private Collection<Object> onlineBaselineNodesRequestedForRemoval( + Collection<? extends BaselineNode> newBlt) { BaselineTopology blt = ctx.state().clusterState().baselineTopology(); Set<Object> bltConsIds; @@ -600,10 +604,9 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus * @see IgniteCluster#startNodes(java.io.File, boolean, int, int) */ IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file, - boolean restart, - int timeout, - int maxConn) - { + boolean restart, + int timeout, + int maxConn) { A.notNull(file, "file"); A.ensure(file.exists(), "file doesn't exist."); A.ensure(file.isFile(), "file is a directory."); @@ -632,8 +635,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus @Nullable Map<String, Object> dflts, boolean restart, int timeout, - int maxConn) - { + int maxConn) { A.notNull(hosts, "hosts"); guard(); @@ -709,7 +711,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus Collections.<ClusterStartNodeResult>emptyList()); // Exceeding max line width for readability. - GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut = + GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> fut = new GridCompoundFuture<>(CU.<ClusterStartNodeResult>objectsReducer()); AtomicInteger cnt = new AtomicInteger(nodeCallCnt); @@ -733,12 +735,10 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** - * Gets the all grid nodes that reside on the same physical computer as local grid node. - * Local grid node is excluded. - * <p> - * Detection of the same physical computer is based on comparing set of network interface MACs. - * If two nodes have the same set of MACs, Ignite considers these nodes running on the same - * physical computer. + * Gets the all grid nodes that reside on the same physical computer as local grid node. Local grid node is + * excluded. <p> Detection of the same physical computer is based on comparing set of network interface MACs. If two + * nodes have the same set of MACs, Ignite considers these nodes running on the same physical computer. + * * @return Grid nodes that reside on the same physical computer as local grid node. */ private Collection<ClusterNode> neighbors() { @@ -766,9 +766,8 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus */ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<StartNodeCallable> queue, final GridCompoundFuture<ClusterStartNodeResult, Collection<ClusterStartNodeResult>> - comp, - final AtomicInteger cnt) - { + comp, + final AtomicInteger cnt) { StartNodeCallable call = queue.poll(); if (call == null) @@ -825,6 +824,11 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** {@inheritDoc} */ + @Override public DistributedBaselineConfiguration baselineConfiguration() { + return distributedBaselineConfiguration; + } + + /** {@inheritDoc} */ @Override public String toString() { return "IgniteCluster [igniteInstanceName=" + ctx.igniteInstanceName() + ']'; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 72f2f3b..0396c3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor; import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage; import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; @@ -307,6 +308,11 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ + @Override public DistributedConfigurationProcessor distributedConfiguration() { + return null; + } + + /** {@inheritDoc} */ @Override public GridTaskSessionProcessor session() { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java new file mode 100644 index 0000000..bcd9ae9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DetachedPropertyException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Exception of distributed property still have not been attached to the processor. + */ +public class DetachedPropertyException extends IgniteCheckedException { + /** */ + private static final long serialVersionUID = 0L; + /** + * @param name Name of detached property. + */ + public DetachedPropertyException(String name) { + super("Property '" + name + "' is detached from the processor."); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java new file mode 100644 index 0000000..adf69c1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedBooleanProperty.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer; + +/** + * Implementation of {@link DistributedProperty} for {@link Boolean}. + */ +public class DistributedBooleanProperty extends DistributedProperty<Boolean> { + + /** {@inheritDoc} */ + DistributedBooleanProperty(String name, Boolean val) { + super(name, val); + } + + /** + * @param name Name of property. + * @param initVal Initial initVal of property. + * @return Property detached from processor.(Distributed updating are not accessable). + */ + public static DistributedBooleanProperty detachedProperty(String name, Boolean initVal) { + return new DistributedBooleanProperty(name, initVal); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java new file mode 100644 index 0000000..2207edf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedComparableProperty.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import java.util.Objects; +import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer; + +/** + * Implementation of {@link DistributedProperty} for {@link Comparable}. + */ +public class DistributedComparableProperty<T extends Comparable<T> & Serializable> extends DistributedProperty<T> { + + /** {@inheritDoc} */ + DistributedComparableProperty(String name, T initVal) { + super(name, initVal); + } + + /** */ + public boolean equalTo(T other) { + return Objects.equals(val, other); + } + + /** */ + public boolean nonEqualTo(T other) { + return !Objects.equals(val, other); + } + + /** */ + public boolean lessThan(T other) { + return val.compareTo(other) < 0; + } + + /** */ + public boolean lessOrEqualTo(T other) { + return val.compareTo(other) <= 0; + } + + /** */ + public boolean greaterThan(T other) { + return val.compareTo(other) > 0; + } + + /** */ + public boolean greaterOrEqualTo(T other) { + return val.compareTo(other) >= 0; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java new file mode 100644 index 0000000..cbdda67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationLifecycleListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +/** + * Lifecycle listener for distributed configuration. + */ +@FunctionalInterface +public interface DistributedConfigurationLifecycleListener { + /** + * Notify about processor ready to register properties. + */ + void onReadyToRegister(DistributedPropertyDispatcher dispatcher); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java new file mode 100644 index 0000000..9c8116d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationProcessor.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage; +import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener; +import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage; +import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor; +import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer; + +import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE; +import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE; +import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.REGISTER; + +/** + * Processor of distributed configuration. + * + * This class control lifecycle of actualization {@link DistributedProperty} across whole cluster. + */ +public class DistributedConfigurationProcessor extends GridProcessorAdapter implements DistributedPropertyDispatcher { + /** Prefix of key for distributed meta storage. */ + private static final String DIST_CONF_PREFIX = "distrConf"; + + /** Properties storage. */ + private final Map<String, DistributedProperty> props = new ConcurrentHashMap<>(); + + /** Global metastorage. */ + private volatile DistributedMetaStorage distributedMetastorage; + + /** Max allowed action. All action with less ordinal than this also allowed. */ + private volatile AllowableAction allowableAction = REGISTER; + + /** + * @param ctx Kernal context. + */ + public DistributedConfigurationProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor(); + + isp.registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() { + @Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) { + distributedMetastorage = ctx.distributedMetastorage(); + + //Listener for handling of cluster wide change of specific properties. Do local update. + distributedMetastorage.listen( + (key) -> key.startsWith(DIST_CONF_PREFIX), + (String key, Serializable oldVal, Serializable newVal) -> { + DistributedProperty prop = props.get(toPropertyKey(key)); + + if (prop != null) + prop.localUpdate(newVal); + } + ); + + //Switch to actualize action and actualize already registered properties. + switchCurrentActionTo(ACTUALIZE); + + //Register and actualize properties waited for this service. + isp.getDistributedConfigurationListeners() + .forEach(listener -> listener.onReadyToRegister(DistributedConfigurationProcessor.this)); + + } + + @Override public void onReadyForWrite(DistributedMetaStorage metastorage) { + //Switch to cluster wide update action and do it on already registered properties. + switchCurrentActionTo(CLUSTER_WIDE_UPDATE); + } + }); + } + + /** + * Switching current action to given action and do all actions from old action to new one. + * + * @param to New action for switching on. + */ + private synchronized void switchCurrentActionTo(AllowableAction to) { + AllowableAction oldAct = allowableAction; + + assert oldAct.ordinal() <= to.ordinal() : "Current action : " + oldAct + ", new action : " + to; + + allowableAction = to; + + for (AllowableAction action : AllowableAction.values()) { + if (action.ordinal() > oldAct.ordinal()) + props.values().forEach(prop -> doAction(action, prop)); + + if (action == to) + break; + } + } + + /** + * @param propKey Key of specific property. + * @return Property key for meta storage. + */ + private static String toMetaStorageKey(String propKey) { + return DIST_CONF_PREFIX + propKey; + } + + /** + * @param metaStorageKey Key from meta storage. + * @return Original property key. + */ + private static String toPropertyKey(String metaStorageKey) { + return metaStorageKey.substring(DIST_CONF_PREFIX.length()); + } + + /** + * Register property to processor and attach it if it possible. + * + * @param prop Property to attach to processor. + * @param <T> Type of property value. + */ + @Override public <T extends DistributedProperty> T registerProperty(T prop) { + doAllAllowableActions(prop); + + return prop; + } + + /** + * Get registered property. + * + * @param <T> Type of property value. + */ + public <T extends DistributedProperty> T getProperty(String name) { + return (T)props.get(name); + } + + /** + * Create and attach new long property. + * + * @param name Name of property. + * @param initVal Initial value of property. + * @return Attached new property. + */ + @Override public DistributedLongProperty registerLong(String name, Long initVal) { + return registerProperty(new DistributedLongProperty(name, initVal)); + } + + /** + * Create and attach new boolean property. + * + * @param name Name of property. + * @param initVal Initial value of property. + * @return Attached new property. + */ + @Override public DistributedBooleanProperty registerBoolean(String name, + Boolean initVal) { + return registerProperty(new DistributedBooleanProperty(name, initVal)); + } + + /** + * Execute all allowable actions until current action on given property. + * + * @param prop Property which action should be executed on. + */ + private void doAllAllowableActions(DistributedProperty prop) { + for (AllowableAction action : AllowableAction.values()) { + doAction(action, prop); + + if (action == allowableAction) + break; + } + } + + /** + * Do one given action on given property. + * + * @param act Action to execute. + * @param prop Property which action should be execute on. + */ + private void doAction(AllowableAction act, DistributedProperty prop) { + switch (act) { + case REGISTER: + doRegister(prop); + break; + case ACTUALIZE: + doActualize(prop); + break; + case CLUSTER_WIDE_UPDATE: + doClusterWideUpdate(prop); + break; + } + } + + /** + * Do register action on given property. + * + * Bind property with this processor for furthter actualizing. + * + * @param prop Property which action should be execute on. + */ + private void doRegister(DistributedProperty prop) { + if (props.containsKey(prop.getName())) + throw new IllegalArgumentException("Property already exists : " + prop.getName()); + + props.put(prop.getName(), prop); + + prop.onAttached(); + } + + /** + * Do actualize action on given property. + * + * Read actual value from metastore and set it to local property. + * + * @param prop Property which action should be execute on. + */ + private void doActualize(DistributedProperty prop) { + Serializable readVal = null; + try { + readVal = distributedMetastorage.read(toMetaStorageKey(prop.getName())); + } + catch (IgniteCheckedException e) { + log.error("Can not read value of property '" + prop.getName() + "'", e); + } + + if (readVal != null) + prop.localUpdate(readVal); + } + + /** + * Do cluster wide action on given property. + * + * Set closure for cluster wide update action to given property. + * + * @param prop Property which action should be execute on. + */ + private void doClusterWideUpdate(DistributedProperty prop) { + prop.onReadyForUpdate( + (IgniteThrowableBiConsumer<String, Serializable>)(key, value) -> + distributedMetastorage.write(toMetaStorageKey(key), value) + ); + } + + /** + * This enum determinate what is action allowable for proccessor in current moment. + * + * Order is important. Each next action allowable all previous actions. Current action can be changed only from + * previous to next . + */ + enum AllowableAction { + /** + * Only registration allowed. Actualization property from metastore and cluster wide update aren't allowed. + */ + REGISTER, + /** + * Registration and actualization property from metastore are allowed. Cluster wide update isn't allowed. + */ + ACTUALIZE, + /** + * All of below are allowed. + */ + CLUSTER_WIDE_UPDATE; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java new file mode 100644 index 0000000..c25b841 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedLongProperty.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer; + +/** + * Implementation of {@link DistributedProperty} for {@link Long}. + */ +public class DistributedLongProperty extends DistributedComparableProperty<Long> { + + /** {@inheritDoc} */ + DistributedLongProperty(String name, Long initVal) { + super(name, initVal); + } + + /** + * @param name Name of property. + * @param initVal Initial initVal of property. + * @return Property detached from processor.(Distributed updating are not accessable). + */ + public static DistributedLongProperty detachedProperty(String name, Long initVal) { + return new DistributedLongProperty(name, initVal); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java new file mode 100644 index 0000000..af67b55 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedProperty.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.lang.IgniteThrowableBiConsumer; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Wrapper of some serializable property providing ability of change this value across whole cluster. + */ +public class DistributedProperty<T extends Serializable> { + /** Name of property. */ + private final String name; + /** Property value. */ + protected volatile T val; + /** Sign of attachment to the processor. */ + private volatile boolean attached = false; + /** + * Specific consumer for update value in cluster. It is null when property doesn't ready to update value on cluster + * wide. + */ + @GridToStringExclude + private volatile IgniteThrowableBiConsumer<String, Serializable> clusterWideUpdater; + + /** + * @param name Name of property. + * @param initVal Initial value of property. + */ + public DistributedProperty(String name, T initVal) { + this.val = initVal; + this.name = name; + } + + /** + * Change value across whole cluster. + * + * @param newVal Value which this property should be changed on. + * @return {@code true} if value was successfully updated and {@code false} if cluster wide update have not + * permitted yet. + * @throws DetachedPropertyException If this property have not been attached to processor yet, please call {@link + * DistributedConfigurationProcessor#registerProperty(DistributedProperty)} before this method. + * @throws IgniteCheckedException If failed during cluster wide update. + */ + public boolean propagate(T newVal) throws IgniteCheckedException { + if (!attached) + throw new DetachedPropertyException(name); + + if (clusterWideUpdater == null) + return false; + + clusterWideUpdater.accept(name, newVal); + + return true; + } + + /** + * @return Current property value. + */ + public T value() { + return val; + } + + /** + * @return Name of property. + */ + public String getName() { + return name; + } + + /** + * This property have been attached to processor. + */ + void onAttached() { + attached = true; + } + + /** + * On this property ready to be update on cluster wide. + * + * @param updater Consumer for update value across cluster. + */ + void onReadyForUpdate(@NotNull IgniteThrowableBiConsumer<String, Serializable> updater) { + this.clusterWideUpdater = updater; + } + + /** + * Update only local value without updating remote cluster. + * + * @param newVal New value. + */ + void localUpdate(Serializable newVal) { + val = (T)newVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DistributedProperty.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java new file mode 100644 index 0000000..3178f75 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedPropertyDispatcher.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import org.apache.ignite.IgniteCheckedException; + +/** + * Dispatcher of distributed properties. + * + * Hold of all register properties of distributed configuration. + */ +public interface DistributedPropertyDispatcher { + /** + * Attach already created property. + * + * @param prop Property to attach to processor. + * @param <T> Type of property value. + */ + public <T extends DistributedProperty> T registerProperty(T prop); + + /** + * Create and attach new long property. + * + * @param name Name of property. + * @param initVal Initial value of property. + * @return Attached new property. + */ + public DistributedLongProperty registerLong(String name, Long initVal); + + /** + * Create and attach new boolean property. + * + * @param name Name of property. + * @param initVal Initial value of property. + * @return Attached new property. + */ + public DistributedBooleanProperty registerBoolean(String name, Boolean initVal); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java index 84c955d..ae5261b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/ReadOnlyDistributedMetaStorageBridge.java @@ -42,7 +42,7 @@ class ReadOnlyDistributedMetaStorageBridge implements DistributedMetaStorageBrid Comparator.comparing(item -> item.key); /** */ - private DistributedMetaStorageHistoryItem[] locFullData; + private DistributedMetaStorageHistoryItem[] locFullData = EMPTY_ARRAY; /** */ private DistributedMetaStorageVersion ver; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index fada9d1..11ff87e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -661,6 +661,12 @@ public class PlatformConfigurationUtils { cfg.setMvccVacuumThreadCount(in.readInt()); if (in.readBoolean()) cfg.setSystemWorkerBlockedTimeout(in.readLong()); + if (in.readBoolean()) + cfg.setInitBaselineAutoAdjustEnabled(in.readBoolean()); + if (in.readBoolean()) + cfg.setInitBaselineAutoAdjustTimeout(in.readLong()); + if (in.readBoolean()) + cfg.setInitBaselineAutoAdjustMaxTimeout(in.readLong()); int sqlSchemasCnt = in.readInt(); @@ -1250,6 +1256,12 @@ public class PlatformConfigurationUtils { } else { w.writeBoolean(false); } + w.writeBoolean(true); + w.writeBoolean(cfg.isInitBaselineAutoAdjustEnabled()); + w.writeBoolean(true); + w.writeLong(cfg.getInitBaselineAutoAdjustTimeout()); + w.writeBoolean(true); + w.writeLong(cfg.getInitBaselineAutoAdjustMaxTimeout()); if (cfg.getSqlSchemas() == null) w.writeInt(-1); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java index 5e48547..7f89ed1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/subscription/GridInternalSubscriptionProcessor.java @@ -22,12 +22,14 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener; import org.jetbrains.annotations.NotNull; +import static java.util.Objects.requireNonNull; + /** - * Processor enables grid components to register listeners for events - * generated by other components on local node. + * Processor enables grid components to register listeners for events generated by other components on local node. * * It starts very first during node startup procedure so any components could use it. * @@ -43,6 +45,10 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { /** */ private final List<DatabaseLifecycleListener> dbListeners = new ArrayList<>(); + /** + * Listeners of distributed configuration controlled by {@link org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor}. + */ + private List<DistributedConfigurationLifecycleListener> distributedConfigurationListeners = new ArrayList<>(); /** * @param ctx Kernal context. @@ -53,8 +59,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { /** */ public void registerMetastorageListener(@NotNull MetastorageLifecycleListener metastorageListener) { - if (metastorageListener == null) - throw new NullPointerException("Metastorage subscriber should be not-null."); + requireNonNull(metastorageListener, "Metastorage subscriber should be not-null."); metastorageListeners.add(metastorageListener); } @@ -66,8 +71,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { /** */ public void registerDistributedMetastorageListener(@NotNull DistributedMetastorageLifecycleListener lsnr) { - if (lsnr == null) - throw new NullPointerException("Global metastorage subscriber should be not-null."); + requireNonNull(lsnr, "Global metastorage subscriber should be not-null."); distributedMetastorageListeners.add(lsnr); } @@ -79,8 +83,7 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { /** */ public void registerDatabaseListener(@NotNull DatabaseLifecycleListener databaseListener) { - if (databaseListener == null) - throw new NullPointerException("Database subscriber should be not-null."); + requireNonNull(databaseListener, "Database subscriber should be not-null."); dbListeners.add(databaseListener); } @@ -89,4 +92,17 @@ public class GridInternalSubscriptionProcessor extends GridProcessorAdapter { public List<DatabaseLifecycleListener> getDatabaseListeners() { return dbListeners; } + + /** */ + public void registerDistributedConfigurationListener( + @NotNull DistributedConfigurationLifecycleListener lifecycleListener) { + requireNonNull(distributedConfigurationListeners, "Distributed configuration subscriber should be not-null."); + + distributedConfigurationListeners.add(lifecycleListener); + } + + /** */ + public List<DistributedConfigurationLifecycleListener> getDistributedConfigurationListeners() { + return distributedConfigurationListeners; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java new file mode 100644 index 0000000..2733759 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/IgniteThrowableBiConsumer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.lang; + +import java.io.Serializable; +import org.apache.ignite.IgniteCheckedException; + +/** + * Represents an operation that accepts a single input argument and returns no result. Unlike most other functional + * interfaces, {@code IgniteThrowableConsumer} is expected to operate via side-effects. + * + * @param <E> Type of closure parameter. + * @param <R> Type of result value. + */ +public interface IgniteThrowableBiConsumer<E, R> extends Serializable { + /** + * Consumer body. + * + * @param e Consumer parameter. + * @throws IgniteCheckedException if body execution was failed. + */ + public void accept(E e, R r) throws IgniteCheckedException; +} diff --git a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider index 1c03b7c5..e9e9d41 100644 --- a/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider +++ b/modules/core/src/test/java/META-INF/services/org.apache.ignite.plugin.PluginProvider @@ -1,3 +1,4 @@ org.apache.ignite.spi.discovery.tcp.TestReconnectPluginProvider org.apache.ignite.internal.processors.cache.persistence.standbycluster.IgniteStandByClusterTest$StanByClusterTestProvider org.apache.ignite.internal.processors.cache.persistence.wal.memtracker.PageMemoryTrackerPluginProvider +org.apache.ignite.internal.processors.configuration.distributed.TestDistibutedConfigurationPlugin diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java new file mode 100644 index 0000000..4e563e9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/DistributedConfigurationTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * + */ +@RunWith(JUnit4.class) +public class DistributedConfigurationTest extends GridCommonAbstractTest { + /** */ + private static final String TEST_PROP = "someLong"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + DataStorageConfiguration storageCfg = new DataStorageConfiguration(); + + storageCfg.getDefaultDataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(500L * 1024 * 1024); + + cfg.setDataStorageConfiguration(storageCfg); + + return cfg; + } + +// /** +// * @throws Exception If failed. +// */ +// @Test +// public void test() throws Exception { +// IgniteEx ignite0 = startGrid(0); +// IgniteEx ignite1 = startGrid(1); +// +// ignite0.cluster().active(true); +// +// Assert.assertEquals(0, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// Assert.assertEquals(0, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// +// ignite0.cluster().baselineConfiguration().setBaselineAutoAdjustTimeout(2); +// +// Assert.assertEquals(2, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// Assert.assertEquals(2, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// +// stopAllGrids(); +// +// ignite0 = startGrid(0); +// ignite1 = startGrid(1); +// +// ignite0.cluster().active(true); +// +// Assert.assertEquals(2, ignite0.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// Assert.assertEquals(2, ignite1.cluster().baselineConfiguration().getBaselineAutoAdjustTimeout()); +// } + + /** + * @throws Exception If failed. + */ + @Test + public void testSuccessClusterWideUpdate() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + ignite0.cluster().active(true); + + DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + DistributedLongProperty long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(0, long0.value().longValue()); + assertEquals(0, long1.value().longValue()); + + assertTrue(long0.propagate(2L)); + + //Value changed on whole grid. + assertEquals(2L, long0.value().longValue()); + assertEquals(2L, long1.value().longValue()); + + stopAllGrids(); + + ignite0 = startGrid(0); + ignite1 = startGrid(1); + + ignite0.cluster().active(true); + + long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(2, long0.value().longValue()); + assertEquals(2, long1.value().longValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testReadLocalValueOnInactiveGrid() throws Exception { + IgniteEx ignite0 = startGrid(0); + startGrid(1); + + ignite0.cluster().active(true); + + DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(0, long0.value().longValue()); + + assertTrue(long0.propagate(2L)); + + stopAllGrids(); + + ignite0 = startGrid(0); + + long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(2, long0.value().longValue()); + + //Cluster wide update have not initialized yet. + assertFalse(long0.propagate(3L)); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testRegisterExistedProperty() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + ignite0.cluster().active(true); + + DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(0, long0.value().longValue()); + + assertTrue(long0.propagate(2L)); + + DistributedLongProperty long1 = ignite1.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + //Already changed to 2. + assertEquals(2, long1.value().longValue()); + } + + /** + * @throws Exception If failed. + */ + @Test(expected = DetachedPropertyException.class) + public void testNotAttachedProperty() throws Exception { + DistributedLongProperty long0 = DistributedLongProperty.detachedProperty(TEST_PROP, 0L); + assertEquals(0, long0.value().longValue()); + + long0.propagate(1L); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testReadInitValueBeforeOnReadyForReady() throws Exception { + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + ignite0.cluster().active(true); + + DistributedLongProperty long0 = ignite0.context().distributedConfiguration().registerLong(TEST_PROP, 0L); + + assertEquals(0, long0.value().longValue()); + + long0.propagate(2L); + + stopAllGrids(); + + TestDistibutedConfigurationPlugin.supplier = (ctx) -> { + DistributedLongProperty longProperty = null; + longProperty = ctx.distributedConfiguration().registerLong(TEST_PROP, -1L); + + //Read init value because onReadyForReady have not happened yet. + assertEquals(-1, longProperty.value().longValue()); + + try { + assertFalse(longProperty.propagate(1L)); + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + }; + + ignite0 = startGrid(0); + ignite1 = startGrid(1); + + long0 = ignite0.context().distributedConfiguration().getProperty(TEST_PROP); + DistributedLongProperty long1 = ignite1.context().distributedConfiguration().getProperty(TEST_PROP); + + //After start it should read from local storage. + assertEquals(2, long0.value().longValue()); + assertEquals(2, long1.value().longValue()); + } + +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java new file mode 100644 index 0000000..d86a3c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/configuration/distributed/TestDistibutedConfigurationPlugin.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.configuration.distributed; + +import java.io.Serializable; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +/** + * TODO: Add class description. + * + * @author @java.author + * @version @java.version + */ +public class TestDistibutedConfigurationPlugin implements PluginProvider { + /** */ + private GridKernalContext igniteCtx; + + public static Consumer<GridKernalContext> supplier = (ctx) -> { + }; + + /** {@inheritDoc} */ + @Override public String name() { + return "TestDistibutedConfigurationPlugin"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0"; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + igniteCtx = ((IgniteKernal)ctx.grid()).context(); + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext ctx) throws IgniteCheckedException { + supplier.accept(igniteCtx); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + // No-op + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() throws IgniteCheckedException { + // No-op + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return null; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + // No-op + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op + } + + /** {@inheritDoc} */ + @Nullable @Override public Object createComponent(PluginContext ctx, Class cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public IgnitePlugin plugin() { + return new IgnitePlugin() { + }; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java index 7f07199..d0d3a5a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterStartNodeResult; import org.apache.ignite.internal.cluster.ClusterGroupEx; +import org.apache.ignite.internal.cluster.DistributedBaselineConfiguration; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; @@ -191,6 +192,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx { } /** {@inheritDoc} */ + @Override public DistributedBaselineConfiguration baselineConfiguration() { + return null; + } + + /** {@inheritDoc} */ @Override public boolean isAsync() { throw new UnsupportedOperationException("Operation is not supported yet."); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs index 7397863..f96a4c7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ApiParity/ClusterParityTest.cs @@ -32,7 +32,8 @@ "startNodes", "startNodesAsync", "stopNodes", - "restartNodes" + "restartNodes", + "baselineConfiguration" }; /** Members that are missing on .NET side and should be added in future. */ diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs index f0f3b7c..2665c25 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs @@ -262,6 +262,10 @@ namespace Apache.Ignite.Core.Tests Assert.AreEqual(cfg.MvccVacuumFrequency, resCfg.MvccVacuumFrequency); Assert.AreEqual(cfg.MvccVacuumThreadCount, resCfg.MvccVacuumThreadCount); + Assert.AreEqual(cfg.InitBaselineAutoAdjustEnabled, resCfg.InitBaselineAutoAdjustEnabled); + Assert.AreEqual(cfg.InitBaselineAutoAdjustTimeout, resCfg.InitBaselineAutoAdjustTimeout); + Assert.AreEqual(cfg.InitBaselineAutoAdjustMaxTimeout, resCfg.InitBaselineAutoAdjustMaxTimeout); + Assert.IsNotNull(resCfg.SqlSchemas); Assert.AreEqual(2, resCfg.SqlSchemas.Count); Assert.IsTrue(resCfg.SqlSchemas.Contains("SCHEMA_3")); diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs index 63bf794..987fc21 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs @@ -216,6 +216,15 @@ namespace Apache.Ignite.Core /** MVCC vacuum thread count. */ private int? _mvccVacuumThreadCnt; + /** */ + private bool? _initBaselineAutoAdjustEnabled; + + /** Initial value of time which we would wait before the actual topology change since last discovery event. */ + private long? _initBaselineAutoAdjustTimeout; + + /** Initial value of time which we would wait from the first discovery event in the chain(node join/exit). */ + private long? _initBaselineAutoAdjustMaxTimeout; + /// <summary> /// Default network retry count. /// </summary> @@ -252,6 +261,21 @@ namespace Apache.Ignite.Core public const int DefaultMvccVacuumThreadCount = 2; /// <summary> + /// Default value for <see cref="InitBaselineAutoAdjustEnabled"/> property. + /// </summary> + public const bool DefaultInitBaselineAutoAdjustEnabled = false; + + /// <summary> + /// Default value for <see cref="InitBaselineAutoAdjustTimeout"/> property. + /// </summary> + public const long DefaultInitBaselineAutoAdjustTimeout = 0; + + /// <summary> + /// Default value for <see cref="InitBaselineAutoAdjustMaxTimeout"/> property. + /// </summary> + public const long DefaultInitBaselineAutoAdjustMaxTimeout = 0; + + /// <summary> /// Initializes a new instance of the <see cref="IgniteConfiguration"/> class. /// </summary> public IgniteConfiguration() @@ -333,6 +357,9 @@ namespace Apache.Ignite.Core writer.WriteLongNullable(_mvccVacuumFreq); writer.WriteIntNullable(_mvccVacuumThreadCnt); writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout); + writer.WriteBooleanNullable(_initBaselineAutoAdjustEnabled); + writer.WriteLongNullable(_initBaselineAutoAdjustTimeout); + writer.WriteLongNullable(_initBaselineAutoAdjustMaxTimeout); if (SqlSchemas == null) writer.WriteInt(-1); @@ -722,6 +749,9 @@ namespace Apache.Ignite.Core _mvccVacuumFreq = r.ReadLongNullable(); _mvccVacuumThreadCnt = r.ReadIntNullable(); _sysWorkerBlockedTimeout = r.ReadTimeSpanNullable(); + _initBaselineAutoAdjustEnabled = r.ReadBooleanNullable(); + _initBaselineAutoAdjustTimeout = r.ReadLongNullable(); + _initBaselineAutoAdjustMaxTimeout = r.ReadLongNullable(); int sqlSchemasCnt = r.ReadInt(); @@ -1655,5 +1685,35 @@ namespace Apache.Ignite.Core /// </summary> [SuppressMessage("Microsoft.Usage", "CA2227:CollectionPropertiesShouldBeReadOnly")] public ICollection<string> SqlSchemas { get; set; } + + /// <summary> + /// Initial value of manual baseline control or auto adjusting baseline. + /// </summary> + [DefaultValue(DefaultInitBaselineAutoAdjustEnabled)] + public bool InitBaselineAutoAdjustEnabled + { + get { return _initBaselineAutoAdjustEnabled ?? DefaultInitBaselineAutoAdjustEnabled; } + set { _initBaselineAutoAdjustEnabled = value; } + } + + /// <summary> + /// Initial value of time which we would wait before the actual topology change since last discovery event. + /// </summary> + [DefaultValue(DefaultInitBaselineAutoAdjustTimeout)] + public long InitBaselineAutoAdjustTimeout + { + get { return _initBaselineAutoAdjustTimeout ?? DefaultInitBaselineAutoAdjustTimeout; } + set { _initBaselineAutoAdjustTimeout = value; } + } + + /// <summary> + /// Initial value of time which we would wait from the first discovery event in the chain(node join/exit). + /// </summary> + [DefaultValue(DefaultInitBaselineAutoAdjustMaxTimeout)] + public long InitBaselineAutoAdjustMaxTimeout + { + get { return _initBaselineAutoAdjustMaxTimeout ?? DefaultInitBaselineAutoAdjustMaxTimeout; } + set { _initBaselineAutoAdjustMaxTimeout = value; } + } } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd index 5f4a439f..efde394 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd @@ -2337,6 +2337,21 @@ <xs:documentation>Whether Java console output should be redirected to Console.Out and Console.Error.</xs:documentation> </xs:annotation> </xs:attribute> + <xs:attribute name="initBaselineAutoAdjustEnabled" type="xs:boolean"> + <xs:annotation> + <xs:documentation>Initial value of manual baseline control or auto adjusting baseline.</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute name="initBaselineAutoAdjustTimeout" type="xs:long"> + <xs:annotation> + <xs:documentation>Initial value of time which we would wait before the actual topology change since last discovery event.</xs:documentation> + </xs:annotation> + </xs:attribute> + <xs:attribute name="initBaselineAutoAdjustMaxTimeout" type="xs:long"> + <xs:annotation> + <xs:documentation>Initial value of time which we would wait from the first discovery event in the chain(node join/exit).</xs:documentation> + </xs:annotation> + </xs:attribute> </xs:complexType> </xs:element> </xs:schema>