Reworked cluster activation/deactivation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1337901f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1337901f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1337901f Branch: refs/heads/master Commit: 1337901f04c866e20093b20449c0872f089fb64b Parents: 54572c3 Author: sboikov <[email protected]> Authored: Wed Jul 5 11:19:43 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 5 11:19:43 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 4 +- .../ignite/internal/GridPluginComponent.java | 2 +- .../apache/ignite/internal/IgniteKernal.java | 33 +- .../internal/managers/GridManagerAdapter.java | 2 +- .../internal/managers/discovery/DiscoCache.java | 17 +- .../discovery/DiscoveryLocalJoinData.java | 104 ++ .../discovery/GridDiscoveryManager.java | 128 +- .../pagemem/store/IgnitePageStoreManager.java | 3 +- .../processors/GridProcessorAdapter.java | 2 +- .../cache/CacheAffinitySharedManager.java | 67 +- .../processors/cache/CacheGroupContext.java | 4 +- .../processors/cache/CacheGroupData.java | 4 +- .../cache/ChangeGlobalStateMessage.java | 120 -- .../processors/cache/ClusterCachesInfo.java | 490 +++++-- .../internal/processors/cache/ClusterState.java | 38 - .../cache/DynamicCacheChangeRequest.java | 52 +- .../processors/cache/ExchangeActions.java | 37 +- .../processors/cache/GridCacheEventManager.java | 2 - .../cache/GridCacheEvictionManager.java | 1 - .../processors/cache/GridCacheIoManager.java | 13 +- .../processors/cache/GridCacheMvccManager.java | 9 +- .../GridCachePartitionExchangeManager.java | 423 +++--- .../processors/cache/GridCacheProcessor.java | 177 ++- .../cache/GridCacheSharedContext.java | 60 +- .../cache/GridCacheSharedManager.java | 6 - .../cache/GridCacheSharedManagerAdapter.java | 16 - .../processors/cache/PendingDiscoveryEvent.java | 61 + .../processors/cache/StateChangeRequest.java | 77 ++ .../binary/CacheObjectBinaryProcessorImpl.java | 4 +- .../distributed/GridCacheTxRecoveryFuture.java | 1 - .../distributed/dht/GridDhtCacheAdapter.java | 1 - .../cache/distributed/dht/GridDhtGetFuture.java | 1 - .../distributed/dht/GridDhtGetSingleFuture.java | 2 - .../dht/GridDhtPartitionTopologyImpl.java | 13 +- .../dht/GridDhtTopologyFutureAdapter.java | 2 +- .../dht/GridPartitionedSingleGetFuture.java | 3 - .../GridNearAtomicAbstractUpdateFuture.java | 1 - .../dht/preloader/GridDhtForceKeysFuture.java | 1 - .../dht/preloader/GridDhtPartitionDemander.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 228 +++- .../preloader/GridDhtPartitionsFullMessage.java | 44 +- .../GridDhtPartitionsSingleMessage.java | 38 +- .../dht/preloader/GridDhtPreloader.java | 2 +- .../distributed/near/GridNearGetFuture.java | 2 - .../near/GridNearTxPrepareRequest.java | 1 - .../GridCacheDatabaseSharedManager.java | 105 +- .../persistence/GridCacheOffheapManager.java | 5 +- .../IgniteCacheDatabaseSharedManager.java | 64 +- .../persistence/IgniteCacheSnapshotManager.java | 12 +- .../persistence/file/FilePageStoreManager.java | 14 +- .../wal/FileWriteAheadLogManager.java | 8 - .../query/GridCacheDistributedQueryManager.java | 4 +- .../store/GridCacheStoreManagerAdapter.java | 1 - .../cache/version/GridCacheVersionManager.java | 6 - .../cacheobject/IgniteCacheObjectProcessor.java | 5 - .../IgniteCacheObjectProcessorImpl.java | 5 - .../cluster/ChangeGlobalStateFinishMessage.java | 86 ++ .../cluster/ChangeGlobalStateMessage.java | 140 ++ .../processors/cluster/ClusterProcessor.java | 3 +- .../cluster/DiscoveryDataClusterState.java | 157 +++ .../cluster/GridClusterStateProcessor.java | 1122 ++++++--------- .../cluster/IgniteChangeGlobalStateSupport.java | 3 +- .../datastructures/DataStructuresProcessor.java | 6 +- .../datastructures/GridCacheAtomicLongImpl.java | 2 +- .../GridCacheAtomicReferenceImpl.java | 2 +- .../GridCacheAtomicSequenceImpl.java | 2 +- .../GridCacheAtomicStampedImpl.java | 2 +- .../GridCacheCountDownLatchImpl.java | 2 +- .../datastructures/GridCacheLockImpl.java | 4 +- .../datastructures/GridCacheQueueAdapter.java | 1 - .../datastructures/GridCacheSemaphoreImpl.java | 2 +- .../datastructures/GridCacheSetImpl.java | 1 - .../internal/processors/igfs/IgfsImpl.java | 2 - .../internal/processors/igfs/IgfsProcessor.java | 2 +- .../processors/query/GridQueryProcessor.java | 4 +- .../processors/rest/GridRestProcessor.java | 2 +- .../cluster/GridChangeStateCommandHandler.java | 2 +- .../service/GridServiceProcessor.java | 6 +- .../processors/task/GridTaskProcessor.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 12 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 10 +- .../internal/TestRecordingCommunicationSpi.java | 10 + ...GridManagerLocalMessageListenerSelfTest.java | 4 +- .../cache/IgniteActiveClusterTest.java | 182 --- .../IgniteClusterActivateDeactivateTest.java | 1284 ++++++++++++++++++ ...erActivateDeactivateTestWithPersistence.java | 197 +++ .../IgniteDaemonNodeMarshallerCacheTest.java | 10 - .../pagemem/NoOpPageStoreManager.java | 12 +- .../persistence/pagemem/NoOpWALManager.java | 23 +- .../AbstractNodeJoinTemplate.java | 149 +- .../IgniteChangeGlobalStateAbstractTest.java | 65 +- .../IgniteChangeGlobalStateCacheTest.java | 2 +- ...IgniteChangeGlobalStateDataStreamerTest.java | 5 +- ...gniteChangeGlobalStateDataStructureTest.java | 6 +- .../IgniteChangeGlobalStateFailOverTest.java | 26 +- .../IgniteChangeGlobalStateTest.java | 158 +-- .../IgniteStandByClusterTest.java | 17 +- .../join/JoinActiveNodeToActiveCluster.java | 62 +- ...ctiveNodeToActiveClusterWithPersistence.java | 17 + .../IgniteStandByClientReconnectTest.java | 13 +- ...eStandByClientReconnectToNewClusterTest.java | 13 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 2 +- .../testframework/junits/GridAbstractTest.java | 4 +- .../junits/common/GridCommonAbstractTest.java | 3 + .../testsuites/IgniteStandByClusterSuite.java | 5 +- .../processors/hadoop/HadoopProcessor.java | 4 +- 106 files changed, 4180 insertions(+), 2197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 0505929..93ffe95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -84,9 +84,11 @@ public interface GridComponent { * Callback that notifies that kernal has successfully started, * including all managers and processors. * + * @param active Cluster active flag (note: should be used carefully since state can + * change concurrently). * @throws IgniteCheckedException Thrown in case of any errors. */ - public void onKernalStart() throws IgniteCheckedException; + public void onKernalStart(boolean active) throws IgniteCheckedException; /** * Callback to notify that kernal is about to stop. http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index cc1ae71..fd59d24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -61,7 +61,7 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { plugin.onIgniteStart(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 31ee3e2..0c17b32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -98,6 +98,7 @@ import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; +import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.failover.GridFailoverManager; @@ -207,7 +208,6 @@ import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER; import static org.apache.ignite.internal.IgniteComponentType.IGFS; import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER; import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_DATE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; @@ -818,8 +818,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { List<PluginProvider> plugins = U.allPluginProviders(); - final boolean activeOnStart = cfg.isActiveOnStart(); - // Spin out SPIs & managers. try { ctx = new GridKernalContextImpl(log, @@ -994,11 +992,28 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Suggest Operation System optimizations. ctx.performance().addAll(OsConfigurationSuggestions.getSuggestions()); + DiscoveryLocalJoinData joinData = ctx.discovery().localJoin(); + + IgniteInternalFuture<Boolean> transitionWaitFut = joinData.transitionWaitFuture(); + + boolean active; + + if (transitionWaitFut != null) { + if (log.isInfoEnabled()) { + log.info("Join cluster while cluster state transition is in progress, " + + "waiting when transition finish."); + } + + active = transitionWaitFut.get(); + } + else + active = joinData.active(); + // Notify discovery manager the first to make sure that topology is discovered. - ctx.discovery().onKernalStart(); + ctx.discovery().onKernalStart(active); // Notify IO manager the second so further components can send and receive messages. - ctx.io().onKernalStart(); + ctx.io().onKernalStart(active); // Start plugins. for (PluginProvider provider : ctx.plugins().allProviders()) @@ -1021,7 +1036,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (!skipDaemon(comp)) { try { - comp.onKernalStart(); + comp.onKernalStart(active); } catch (IgniteNeedReconnectException e) { assert ctx.discovery().reconnectSupported(); @@ -1486,7 +1501,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { add(ATTR_MARSHALLER_USE_DFLT_SUID, getBoolean(IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID, OptimizedMarshaller.USE_DFLT_SUID)); add(ATTR_LATE_AFFINITY_ASSIGNMENT, cfg.isLateAffinityAssignment()); - add(ATTR_ACTIVE_ON_START, cfg.isActiveOnStart()); if (cfg.getMarshaller() instanceof BinaryMarshaller) { add(ATTR_MARSHALLER_COMPACT_FOOTER, cfg.getBinaryConfiguration() == null ? @@ -3395,7 +3409,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { guard(); try { - return context().state().active(); + return context().state().publicApiActiveState(); } finally { unguard(); @@ -3694,10 +3708,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @throws IgniteException if cluster in inActive state */ private void checkClusterState() throws IgniteException { - if (!ctx.state().active()) + if (!ctx.state().publicApiActiveState()) { throw new IgniteException("Can not perform the operation because the cluster is inactive. Note, that " + "the cluster is considered inactive by default if Ignite Persistent Store is used to let all the nodes " + "join the cluster. To activate the cluster call Ignite.activate(true)."); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 7dfeffb..a151eb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -362,7 +362,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } /** {@inheritDoc} */ - @Override public final void onKernalStart() throws IgniteCheckedException { + @Override public final void onKernalStart(boolean active) throws IgniteCheckedException { for (final IgniteSpi spi : spis) { try { spi.onContextInitialized(new IgniteSpiContext() { http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 2b3c4fc..4c1077b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -37,6 +38,9 @@ import org.jetbrains.annotations.Nullable; * */ public class DiscoCache { + /** */ + private final DiscoveryDataClusterState state; + /** Local node. */ private final ClusterNode loc; @@ -78,6 +82,7 @@ public class DiscoCache { private final Set<UUID> alives = new GridConcurrentHashSet<>(); /** + * @param state Current cluster state. * @param loc Local node. * @param rmtNodes Remote nodes. * @param allNodes All nodes. @@ -91,7 +96,9 @@ public class DiscoCache { * @param nodeMap Node map. * @param alives Alive nodes. */ - DiscoCache(ClusterNode loc, + DiscoCache( + DiscoveryDataClusterState state, + ClusterNode loc, List<ClusterNode> rmtNodes, List<ClusterNode> allNodes, List<ClusterNode> srvNodes, @@ -103,6 +110,7 @@ public class DiscoCache { Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, Set<UUID> alives) { + this.state = state; this.loc = loc; this.rmtNodes = rmtNodes; this.allNodes = allNodes; @@ -117,6 +125,13 @@ public class DiscoCache { this.alives.addAll(alives); } + /** + * @return Current cluster state. + */ + public DiscoveryDataClusterState state() { + return state; + } + /** @return Local node. */ public ClusterNode localNode() { return loc; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java new file mode 100644 index 0000000..a1f2aa7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryLocalJoinData.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Information about local join event. + */ +public class DiscoveryLocalJoinData { + /** */ + private final DiscoveryEvent evt; + + /** */ + private final DiscoCache discoCache; + + /** */ + private final AffinityTopologyVersion joinTopVer; + + /** */ + private final IgniteInternalFuture<Boolean> transitionWaitFut; + + /** */ + private final boolean active; + + /** + * @param evt Event. + * @param discoCache Discovery data cache. + * @param transitionWaitFut Future if cluster state transition is in progress. + * @param active Cluster active status. + */ + public DiscoveryLocalJoinData(DiscoveryEvent evt, + DiscoCache discoCache, + @Nullable IgniteInternalFuture<Boolean> transitionWaitFut, + boolean active) { + assert evt != null && evt.topologyVersion() > 0 : evt; + + this.evt = evt; + this.discoCache = discoCache; + this.transitionWaitFut = transitionWaitFut; + this.active = active; + + joinTopVer = new AffinityTopologyVersion(evt.topologyVersion(), 0); + } + + /** + * @return Future if cluster state transition is in progress. + */ + @Nullable public IgniteInternalFuture<Boolean> transitionWaitFuture() { + return transitionWaitFut; + } + + /** + * @return Cluster state. + */ + public boolean active() { + return active; + } + + /** + * @return Event. + */ + public DiscoveryEvent event() { + return evt; + } + + /** + * @return Discovery data cache. + */ + public DiscoCache discoCache() { + return discoCache; + } + + /** + * @return Join topology version. + */ + public AffinityTopologyVersion joinTopologyVersion() { + return joinTopVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryLocalJoinData.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index c38e37a..9f5bd3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -75,8 +75,11 @@ import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscove import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -90,7 +93,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -133,7 +135,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED; import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_ACTIVE_ON_START; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; @@ -144,6 +145,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SECURITY_COMP import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_USER_NAME; import static org.apache.ignite.internal.IgniteVersionUtils.VER; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.processors.security.SecurityUtils.SERVICE_PERMISSIONS_SINCE; import static org.apache.ignite.internal.processors.security.SecurityUtils.isSecurityCompatibilityMode; import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.NOOP; @@ -238,7 +240,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private long segChkFreq; /** Local node join to topology event. */ - private GridFutureAdapter<T2<DiscoveryEvent, DiscoCache>> locJoin = new GridFutureAdapter<>(); + private GridFutureAdapter<DiscoveryLocalJoinData> locJoin = new GridFutureAdapter<>(); /** GC CPU load. */ private volatile double gcCpuLoad; @@ -570,7 +572,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (type != EVT_NODE_SEGMENTED && type != EVT_CLIENT_NODE_DISCONNECTED && type != EVT_CLIENT_NODE_RECONNECTED && - type != DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + type != EVT_DISCOVERY_CUSTOM_EVT) { minorTopVer = 0; verChanged = true; @@ -586,15 +588,50 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { updateClientNodes(node.id()); } + DiscoCache discoCache = null; + + boolean locJoinEvt = type == EVT_NODE_JOINED && node.id().equals(locNode.id()); + + IgniteInternalFuture<Boolean> transitionWaitFut = null; + + ChangeGlobalStateFinishMessage stateFinishMsg = null; + + if (locJoinEvt) { + discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + + transitionWaitFut = ctx.state().onLocalJoin(discoCache); + } + else if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) + stateFinishMsg = ctx.state().onNodeLeft(node); + final AffinityTopologyVersion nextTopVer; - if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (type == EVT_DISCOVERY_CUSTOM_EVT) { assert customMsg != null; - boolean incMinorTopVer = ctx.cache().onCustomEvent( - customMsg, - new AffinityTopologyVersion(topVer, minorTopVer), - node); + boolean incMinorTopVer; + + if (customMsg instanceof ChangeGlobalStateMessage) { + incMinorTopVer = ctx.state().onStateChangeMessage( + new AffinityTopologyVersion(topVer, minorTopVer), + (ChangeGlobalStateMessage)customMsg, + discoCache()); + } + else if (customMsg instanceof ChangeGlobalStateFinishMessage) { + ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg); + + discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + + topSnap.set(new Snapshot(topSnap.get().topVer, discoCache)); + + incMinorTopVer = false; + } + else { + incMinorTopVer = ctx.cache().onCustomEvent( + customMsg, + new AffinityTopologyVersion(topVer, minorTopVer), + node); + } if (incMinorTopVer) { minorTopVer++; @@ -603,17 +640,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); - - if (verChanged) - ctx.cache().onDiscoveryEvent(type, node, nextTopVer); } - else { + else nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); - ctx.cache().onDiscoveryEvent(type, node, nextTopVer); - } + ctx.cache().onDiscoveryEvent(type, customMsg, node, nextTopVer, ctx.state().clusterState()); - if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (type == EVT_DISCOVERY_CUSTOM_EVT) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); @@ -630,13 +663,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - final DiscoCache discoCache; - // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { - discoCache = createDiscoCache(locNode, topSnapshot); + if (discoCache == null) + discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); discoCacheHist.put(nextTopVer, discoCache); @@ -650,8 +682,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Current version. discoCache = discoCache(); + final DiscoCache discoCache0 = discoCache; + // If this is a local join event, just save it and do not notify listeners. - if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { + if (locJoinEvt) { if (gridStartTime == 0) gridStartTime = getSpi().getGridStartTime(); @@ -668,7 +702,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoEvt.topologySnapshot(topVer, new ArrayList<>(F.view(topSnapshot, FILTER_DAEMON))); - locJoin.onDone(new T2<>(discoEvt, discoCache)); + discoWrk.discoCache = discoCache; + + if (!isLocDaemon && !ctx.clientDisconnected()) + ctx.cache().context().exchange().onLocalJoin(discoEvt, discoCache); + + locJoin.onDone(new DiscoveryLocalJoinData(discoEvt, + discoCache, + transitionWaitFut, + ctx.state().clusterState().active())); return; } @@ -697,7 +739,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - createDiscoCache(locNode, Collections.<ClusterNode>emptySet()))); + createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -709,12 +751,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); + ctx.cache().context().exchange().onLocalJoin(localJoinEvent(), discoCache); + ctx.cluster().clientReconnectFuture().listen(new CI1<IgniteFuture<?>>() { @Override public void apply(IgniteFuture<?> fut) { try { fut.get(); - discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, null); + discoWrk.addEvent(type, nextTopVer, node, discoCache0, topSnapshot, null); } catch (IgniteException ignore) { // No-op. @@ -727,6 +771,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (type == EVT_CLIENT_NODE_DISCONNECTED || type == EVT_NODE_SEGMENTED || !ctx.clientDisconnected()) discoWrk.addEvent(type, nextTopVer, node, discoCache, topSnapshot, customMsg); + + if (stateFinishMsg != null) + discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, nextTopVer, node, discoCache, topSnapshot, stateFinishMsg); } }); @@ -826,7 +873,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @return {@code True} if should not process message. */ private boolean skipMessage(int type, @Nullable DiscoveryCustomMessage customMsg) { - if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (type == EVT_DISCOVERY_CUSTOM_EVT) { assert customMsg != null && customMsg.id() != null : customMsg; if (rcvdCustomMsgs.contains(customMsg.id())) { @@ -1157,7 +1204,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { locMarshStrSerVer2; boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); - boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START); Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); Boolean locSecurityCompatibilityEnabled = locNode.attribute(ATTR_SECURITY_COMPATIBILITY_MODE); @@ -1971,7 +2017,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @return Event that represents a local node joined to topology. */ public DiscoveryEvent localJoinEvent() { try { - return locJoin.get().get1(); + return locJoin.get().event(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1981,7 +2027,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @return Tuple that consists of a local join event and discovery cache at the join time. */ - public T2<DiscoveryEvent, DiscoCache> localJoin() { + public DiscoveryLocalJoinData localJoin() { try { return locJoin.get(); } @@ -2016,7 +2062,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { public void clientCacheStartEvent(UUID reqId, @Nullable Map<String, DynamicCacheChangeRequest> startReqs, @Nullable Set<String> cachesToClose) { - discoWrk.addEvent(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, + discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT, AffinityTopologyVersion.NONE, localNode(), null, @@ -2098,11 +2144,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * Called from discovery thread. * + * @param state Current state. * @param loc Local node. * @param topSnapshot Topology snapshot. * @return Newly created discovery cache. */ - @NotNull private DiscoCache createDiscoCache(ClusterNode loc, Collection<ClusterNode> topSnapshot) { + @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state, + ClusterNode loc, + Collection<ClusterNode> topSnapshot) { HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); HashMap<UUID, ClusterNode> nodeMap = U.newHashMap(topSnapshot.size()); @@ -2177,6 +2226,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } return new DiscoCache( + state, loc, Collections.unmodifiableList(rmtNodes), Collections.unmodifiableList(allNodes), @@ -2318,7 +2368,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, node, - createDiscoCache(node, empty), + createDiscoCache(null, node, empty), empty, null); @@ -2339,6 +2389,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { + /** */ + private DiscoCache discoCache; + /** Event queue. */ private final BlockingQueue<GridTuple6<Integer, AffinityTopologyVersion, ClusterNode, DiscoCache, Collection<ClusterNode>, DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); @@ -2457,6 +2510,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { boolean segmented = false; + if (evt.get4() != null) + discoCache = evt.get4(); + switch (type) { case EVT_NODE_JOINED: { assert !discoOrdered || topVer.topologyVersion() == node.order() : "Invalid topology version [topVer=" + topVer + @@ -2570,8 +2626,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } - case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { - if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) { + case EVT_DISCOVERY_CUSTOM_EVT: { + if (ctx.event().isRecordable(EVT_DISCOVERY_CUSTOM_EVT)) { DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); customEvt.node(ctx.discovery().localNode()); @@ -2581,6 +2637,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.affinityTopologyVersion(topVer); customEvt.customMessage(evt.get6()); + if (evt.get4() == null) { + assert discoCache != null : evt.get6(); + + evt.set4(discoCache); + } + ctx.event().record(customEvt, evt.get4()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 468d35d..fa6e9e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -183,11 +183,10 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh public Map<String, StoredCacheData> readCacheConfigurations() throws IgniteCheckedException; /** - * @param grpDesc Cache group descriptor. * @param cacheData Cache configuration. * @throws IgniteCheckedException If failed. */ - public void storeCacheData(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException; + public void storeCacheData(StoredCacheData cacheData) throws IgniteCheckedException; /** * @param grpId Cache group ID. * @return {@code True} if index store for given cache group existed before node started. http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index 690ba0e..d6f78ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -65,7 +65,7 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void onKernalStart(boolean active) throws IgniteCheckedException { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 9516f84..8d08c3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -41,6 +41,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; @@ -52,6 +53,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -108,6 +111,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** */ private final ThreadLocal<ClientCacheChangeDiscoveryMessage> clientCacheChanges = new ThreadLocal<>(); + /** Caches initialized flag (initialized when join activate cluster or after activation. */ + private boolean cachesInitialized; + /** Discovery listener. */ private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -140,10 +146,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * Callback invoked from discovery thread when discovery message is received. * * @param type Event type. + * @param customMsg Custom message instance. * @param node Event node. * @param topVer Topology version. + * @param state Cluster state. */ - void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + void onDiscoveryEvent(int type, + @Nullable DiscoveryCustomMessage customMsg, + ClusterNode node, + AffinityTopologyVersion topVer, + DiscoveryDataClusterState state) { + if (state.transition() || !state.active()) + return; + if (type == EVT_NODE_JOINED && node.isLocal()) { // Clean-up in case of client reconnect. caches.clear(); @@ -153,6 +168,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap lastAffVer = null; caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); + + cachesInitialized = true; + } + else if (customMsg instanceof ChangeGlobalStateFinishMessage) { + if (!cachesInitialized && ((ChangeGlobalStateFinishMessage)customMsg).clusterActive()) { + caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); + + cachesInitialized = true; + } } if (!CU.clientNode(node) && (type == EVT_NODE_FAILED || type == EVT_NODE_JOINED || type == EVT_NODE_LEFT)) { @@ -404,7 +428,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap DynamicCacheChangeRequest startReq = startReqs.get(desc.cacheName()); - cctx.cache().prepareCacheStart(desc, startReq.nearCacheConfiguration(), topVer); + cctx.cache().prepareCacheStart(desc.cacheConfiguration(), + desc, + startReq.nearCacheConfiguration(), + topVer); startedInfos.put(desc.cacheId(), startReq.nearCacheConfiguration() != null); @@ -683,19 +710,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap NearCacheConfiguration nearCfg = null; - if (exchActions.newClusterState() == ClusterState.ACTIVE) { - if (CU.isSystemCache(req.cacheName())) - startCache = true; - else if (!cctx.localNode().isClient()) { - startCache = cctx.cacheContext(action.descriptor().cacheId()) == null && - CU.affinityNode(cctx.localNode(), req.startCacheConfiguration().getNodeFilter()); - - nearCfg = req.nearCacheConfiguration(); - } - else // Only static cache configured on client must be started. - startCache = cctx.kernalContext().state().isLocallyConfigured(req.cacheName()); - } - else if (cctx.localNodeId().equals(req.initiatingNodeId())) { + if (req.locallyConfigured() || (cctx.localNodeId().equals(req.initiatingNodeId()) && !exchActions.activate())) { startCache = true; nearCfg = req.nearCacheConfiguration(); @@ -703,7 +718,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { // Cache should not be started assert cctx.cacheContext(cacheDesc.cacheId()) == null - : "Starting cache has not null context: " + cacheDesc.cacheName(); + : "Starting cache has not null context: " + cacheDesc.cacheName(); IgniteCacheProxy cacheProxy = cctx.cache().jcacheProxy(req.cacheName()); @@ -711,27 +726,29 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (cacheProxy != null) { // Cache should be in restarting mode assert cacheProxy.isRestarting() - : "Cache has non restarting proxy " + cacheProxy; + : "Cache has non restarting proxy " + cacheProxy; startCache = true; } - else - startCache = CU.affinityNode(cctx.localNode(), cacheDesc.groupDescriptor().config().getNodeFilter()); + else { + startCache = CU.affinityNode(cctx.localNode(), + cacheDesc.groupDescriptor().config().getNodeFilter()); + } } try { // Save configuration before cache started. - if (cctx.pageStore() != null && !cctx.localNode().isClient()) + if (cctx.pageStore() != null && !cctx.kernalContext().clientNode()) { cctx.pageStore().storeCacheData( - cacheDesc.groupDescriptor(), new StoredCacheData(req.startCacheConfiguration()) ); + } if (startCache) { - cctx.cache().prepareCacheStart(cacheDesc, nearCfg, fut.topologyVersion()); - - if (exchActions.newClusterState() == null) - cctx.kernalContext().state().onCacheStart(req); + cctx.cache().prepareCacheStart(req.startCacheConfiguration(), + cacheDesc, + nearCfg, + fut.topologyVersion()); if (fut.cacheAddedOnExchange(cacheDesc.cacheId(), cacheDesc.receivedFrom())) { if (fut.discoCache().cacheGroupAffinityNodes(cacheDesc.groupId()).isEmpty()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index c3ddc5f..14eb362 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -683,6 +683,8 @@ public class CacheGroupContext { aff.cancelFutures(err); + preldr.onKernalStop(); + offheapMgr.stop(); ctx.io().removeCacheGroupHandlers(grpId); @@ -853,8 +855,6 @@ public class CacheGroupContext { preldr = new GridCachePreloaderAdapter(this); if (ctx.kernalContext().config().getPersistentStoreConfiguration() != null) { - ClassLoader clsLdr = U.gridClassLoader(); - try { offheapMgr = new GridCacheOffheapManager(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java index a290caf..99b7b1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupData.java @@ -78,8 +78,8 @@ public class CacheGroupData implements Serializable { Map<String, Integer> caches, long flags) { assert cacheCfg != null; - assert grpId != 0; - assert deploymentId != null; + assert grpId != 0 : cacheCfg.getName(); + assert deploymentId != null : cacheCfg.getName(); this.cacheCfg = cacheCfg; this.grpName = grpName; http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java deleted file mode 100644 index 4d1a50b..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ChangeGlobalStateMessage.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.UUID; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; - -/** - * Message represent request for change cluster global state. - */ -public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** Custom message ID. */ - private IgniteUuid id = IgniteUuid.randomUuid(); - - /** Request ID */ - private UUID requestId; - - /** Initiator node ID. */ - private UUID initiatingNodeId; - - /** If true activate else deactivate. */ - private boolean activate; - - /** Batch contains all requests for start or stop caches. */ - private DynamicCacheChangeBatch changeGlobalStateBatch; - - /** If happened concurrent activate/deactivate then processed only first message, other message must be skip. */ - private boolean concurrentChangeState; - - /** - * - */ - public ChangeGlobalStateMessage( - UUID requestId, - UUID initiatingNodeId, - boolean activate, - DynamicCacheChangeBatch changeGlobalStateBatch - ) { - this.requestId = requestId; - this.initiatingNodeId = initiatingNodeId; - this.activate = activate; - this.changeGlobalStateBatch = changeGlobalStateBatch; - } - - /** - * - */ - public DynamicCacheChangeBatch getDynamicCacheChangeBatch() { - return changeGlobalStateBatch; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid id() { - return id; - } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return !concurrentChangeState ? changeGlobalStateBatch : null; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return false; - } - - /** - * - */ - public UUID initiatorNodeId() { - return initiatingNodeId; - } - - /** - * - */ - public boolean activate() { - return activate; - } - - /** - * - */ - public UUID requestId() { - return requestId; - } - - /** - * - */ - public void concurrentChangeState() { - this.concurrentChangeState = true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ChangeGlobalStateMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1337901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 8f124b2..5452bd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -40,6 +40,9 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; @@ -93,10 +96,13 @@ class ClusterCachesInfo { private List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> locJoinStartCaches; /** */ - private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs; + private Map<String, T2<CacheConfiguration, NearCacheConfiguration>> locCfgsForActivation; /** */ - private volatile Exception onJoinCacheException; + private Map<UUID, CacheClientReconnectDiscoveryData> clientReconnectReqs; + + /** {@code True} if joined cluster while cluster state change was in progress. */ + private boolean joinOnTransition; /** * @param ctx Context. @@ -113,14 +119,25 @@ class ClusterCachesInfo { */ void onStart(CacheJoinNodeDiscoveryData joinDiscoData) throws IgniteCheckedException { this.joinDiscoData = joinDiscoData; - } - /** - * - * @return Exception if cache has conflict. - */ - Exception onJoinCacheException(){ - return onJoinCacheException; + Map<String, CacheConfiguration> grpCfgs = new HashMap<>(); + + for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) { + if (info.cacheData().config().getGroupName() == null) + continue; + + CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName()); + + if (ccfg == null) + grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config()); + else + validateCacheGroupConfiguration(ccfg, info.cacheData().config()); + } + + String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true); + + if (conflictErr != null) + throw new IgniteCheckedException("Failed to start configured cache. " + conflictErr); } /** @@ -142,7 +159,9 @@ class ClusterCachesInfo { if (gridData != null && gridData.conflictErr != null) throw new IgniteCheckedException(gridData.conflictErr); - if (joinDiscoData != null && gridData != null) { + if (gridData != null && gridData.joinDiscoData != null) { + CacheJoinNodeDiscoveryData joinDiscoData = gridData.joinDiscoData; + for (CacheJoinNodeDiscoveryData.CacheInfo locCacheInfo : joinDiscoData.caches().values()) { CacheConfiguration locCfg = locCacheInfo.cacheData().config(); @@ -165,9 +184,9 @@ class ClusterCachesInfo { } } - joinDiscoData = null; gridData = null; } + /** * Checks that remote caches has configuration compatible with the local. * @@ -308,22 +327,64 @@ class ClusterCachesInfo { } } } - /** * @param batch Cache change request. * @param topVer Topology version. * @return {@code True} if minor topology version should be increased. */ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVersion topVer) { - ExchangeActions exchangeActions = new ExchangeActions(); + DiscoveryDataClusterState state = ctx.state().clusterState(); + + if (state.active() && !state.transition()) { + ExchangeActions exchangeActions = new ExchangeActions(); + + CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, + batch.requests(), + topVer, + false); - boolean incMinorTopVer = false; + if (res.needExchange) { + assert !exchangeActions.empty() : exchangeActions; - List<DynamicCacheDescriptor> addedDescs = new ArrayList<>(); + batch.exchangeActions(exchangeActions); + } + + return res.needExchange; + } + else { + IgniteCheckedException err = new IgniteCheckedException("Failed to start/stop cache, cluster state change " + + "is in progress."); + + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.template()) { + ctx.cache().completeTemplateAddFuture(req.startCacheConfiguration().getName(), + req.deploymentId()); + } + else + ctx.cache().completeCacheStartFuture(req, false, err); + } + + return false; + } + } + + /** + * @param exchangeActions Exchange actions to update. + * @param reqs Requests. + * @param topVer Topology version. + * @param persistedCfgs {@code True} if process start of persisted caches during cluster activation. + * @return Process result. + */ + private CacheChangeProcessResult processCacheChangeRequests( + ExchangeActions exchangeActions, + Collection<DynamicCacheChangeRequest> reqs, + AffinityTopologyVersion topVer, + boolean persistedCfgs) { + CacheChangeProcessResult res = new CacheChangeProcessResult(); final List<T2<DynamicCacheChangeRequest, AffinityTopologyVersion>> reqsToComplete = new ArrayList<>(); - for (DynamicCacheChangeRequest req : batch.requests()) { + for (DynamicCacheChangeRequest req : reqs) { if (req.template()) { CacheConfiguration ccfg = req.startCacheConfiguration(); @@ -347,17 +408,18 @@ class ClusterCachesInfo { assert old == null; - addedDescs.add(templateDesc); + res.addedDescs.add(templateDesc); } - ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); + if (!persistedCfgs) + ctx.cache().completeTemplateAddFuture(ccfg.getName(), req.deploymentId()); continue; } assert !req.clientStartOnly() : req; - DynamicCacheDescriptor desc = req.globalStateChange() ? null : registeredCaches.get(req.cacheName()); + DynamicCacheDescriptor desc = registeredCaches.get(req.cacheName()); boolean needExchange = false; @@ -373,22 +435,32 @@ class ClusterCachesInfo { if (conflictErr != null) { U.warn(log, "Ignore cache start request. " + conflictErr); - ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + - "cache. " + conflictErr)); + IgniteCheckedException err = new IgniteCheckedException("Failed to start " + + "cache. " + conflictErr); + + if (persistedCfgs) + res.errs.add(err); + else + ctx.cache().completeCacheStartFuture(req, false, err); continue; } if (req.clientStartOnly()) { + assert !persistedCfgs; + ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " + "client cache (a cache with the given name is not started): " + req.cacheName())); } else { SchemaOperationException err = QueryUtils.checkQueryEntityConflicts( - req.startCacheConfiguration(), ctx.cache().cacheDescriptors().values()); + req.startCacheConfiguration(), registeredCaches.values()); if (err != null) { - ctx.cache().completeCacheStartFuture(req, false, err); + if (persistedCfgs) + res.errs.add(err); + else + ctx.cache().completeCacheStartFuture(req, false, err); continue; } @@ -430,11 +502,13 @@ class ClusterCachesInfo { ccfg.getName(), ccfg.getNearConfiguration() != null); - ctx.discovery().addClientNode(req.cacheName(), - req.initiatingNodeId(), - req.nearCacheConfiguration() != null); + if (!persistedCfgs) { + ctx.discovery().addClientNode(req.cacheName(), + req.initiatingNodeId(), + req.nearCacheConfiguration() != null); + } - addedDescs.add(startDesc); + res.addedDescs.add(startDesc); exchangeActions.addCacheToStart(req, startDesc); @@ -442,6 +516,7 @@ class ClusterCachesInfo { } } else { + assert !persistedCfgs; assert req.initiatingNodeId() != null : req; if (req.failIfExists()) { @@ -489,8 +564,6 @@ class ClusterCachesInfo { } } } - else if (req.globalStateChange()) - exchangeActions.newClusterState(req.state()); else if (req.resetLostPartitions()) { if (desc != null) { needExchange = true; @@ -559,18 +632,18 @@ class ClusterCachesInfo { assert false : req; if (!needExchange) { - if (!clientCacheStart && req.initiatingNodeId().equals(ctx.localNodeId())) + if (!clientCacheStart && ctx.localNodeId().equals(req.initiatingNodeId())) reqsToComplete.add(new T2<>(req, waitTopVer)); } else - incMinorTopVer = true; + res.needExchange = true; } - if (!F.isEmpty(addedDescs)) { - AffinityTopologyVersion startTopVer = incMinorTopVer ? topVer.nextMinorVersion() : topVer; + if (!F.isEmpty(res.addedDescs)) { + AffinityTopologyVersion startTopVer = res.needExchange ? topVer.nextMinorVersion() : topVer; - for (DynamicCacheDescriptor desc : addedDescs) { - assert desc.template() || incMinorTopVer; + for (DynamicCacheDescriptor desc : res.addedDescs) { + assert desc.template() || res.needExchange; desc.startTopologyVersion(startTopVer); } @@ -602,13 +675,7 @@ class ClusterCachesInfo { }); } - if (incMinorTopVer) { - assert !exchangeActions.empty() : exchangeActions; - - batch.exchangeActions(exchangeActions); - } - - return incMinorTopVer; + return res; } /** @@ -669,7 +736,7 @@ class ClusterCachesInfo { return new CacheClientReconnectDiscoveryData(cacheGrpsInfo, cachesInfo); } else { - assert ctx.config().isDaemon() || joinDiscoData != null || !ctx.state().active(); + assert ctx.config().isDaemon() || joinDiscoData != null; return joinDiscoData; } @@ -720,31 +787,6 @@ class ClusterCachesInfo { return started != null ? started : Collections.<DynamicCacheDescriptor>emptyList(); } - public void addJoinInfo() { - try { - Map<String, CacheConfiguration> grpCfgs = new HashMap<>(); - - for (CacheJoinNodeDiscoveryData.CacheInfo info : joinDiscoData.caches().values()) { - if (info.cacheData().config().getGroupName() == null) - continue; - - CacheConfiguration ccfg = grpCfgs.get(info.cacheData().config().getGroupName()); - - if (ccfg == null) - grpCfgs.put(info.cacheData().config().getGroupName(), info.cacheData().config()); - else - validateCacheGroupConfiguration(ccfg, info.cacheData().config()); - } - - String conflictErr = processJoiningNode(joinDiscoData, ctx.localNodeId(), true); - - if (conflictErr != null) - onJoinCacheException = new IgniteCheckedException("Failed to start configured cache. " + conflictErr); - }catch (IgniteCheckedException e){ - onJoinCacheException = e; - } - } - /** * Discovery event callback, executed from discovery thread. * @@ -771,10 +813,7 @@ class ClusterCachesInfo { if (node.id().equals(ctx.discovery().localNode().id())) { if (gridData == null) { // First node starts. - assert joinDiscoData != null || !ctx.state().active(); - - if (ctx.state().active()) - addJoinInfo(); + assert joinDiscoData != null; initStartCachesForLocalJoin(true); } @@ -864,7 +903,7 @@ class ClusterCachesInfo { if (ctx.isDaemon() || data.commonData() == null) return; - assert joinDiscoData != null || disconnectedState() || !ctx.state().active(); + assert joinDiscoData != null || disconnectedState(); assert data.commonData() instanceof CacheNodeCommonDiscoveryData : data; CacheNodeCommonDiscoveryData cachesData = (CacheNodeCommonDiscoveryData)data.commonData(); @@ -965,7 +1004,7 @@ class ClusterCachesInfo { } } - gridData = new GridData(cachesData, conflictErr); + gridData = new GridData(joinDiscoData, cachesData, conflictErr); if (!disconnectedState()) initStartCachesForLocalJoin(false); @@ -977,11 +1016,20 @@ class ClusterCachesInfo { * @param firstNode {@code True} if first node in cluster starts. */ private void initStartCachesForLocalJoin(boolean firstNode) { - assert locJoinStartCaches == null; + assert F.isEmpty(locJoinStartCaches) : locJoinStartCaches; + + if (ctx.state().clusterState().transition()) { + joinOnTransition = true; - locJoinStartCaches = new ArrayList<>(); + return; + } if (joinDiscoData != null) { + locJoinStartCaches = new ArrayList<>(); + locCfgsForActivation = new HashMap<>(); + + boolean active = ctx.state().clusterState().active(); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { if (firstNode && !joinDiscoData.caches().containsKey(desc.cacheName())) continue; @@ -997,13 +1045,13 @@ class ClusterCachesInfo { DynamicCacheDescriptor desc0 = new DynamicCacheDescriptor(ctx, locCfg.cacheData().config(), - desc.cacheType(), - desc.groupDescriptor(), - desc.template(), - desc.receivedFrom(), - desc.staticallyConfigured(), - desc.sql(), - desc.deploymentId(), + desc.cacheType(), + desc.groupDescriptor(), + desc.template(), + desc.receivedFrom(), + desc.staticallyConfigured(), + desc.sql(), + desc.deploymentId(), new QuerySchema(locCfg.cacheData().queryEntities())); desc0.startTopologyVersion(desc.startTopologyVersion()); @@ -1016,14 +1064,126 @@ class ClusterCachesInfo { if (locCfg != null || joinDiscoData.startCaches() || CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter())) { - // Move system and internal caches first. - if (desc.cacheType().userCache()) - locJoinStartCaches.add(new T2<>(desc, nearCfg)); + if (active) { + // Move system and internal caches first. + if (desc.cacheType().userCache()) + locJoinStartCaches.add(new T2<>(desc, nearCfg)); + else + locJoinStartCaches.add(0, new T2<>(desc, nearCfg)); + } else - locJoinStartCaches.add(0, new T2<>(desc, nearCfg)); + locCfgsForActivation.put(desc.cacheName(), new T2<>(desc.cacheConfiguration(), nearCfg)); + } + } + } + } + + /** + * @param msg Message. + */ + void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { + if (joinOnTransition) { + initStartCachesForLocalJoin(false); + + joinOnTransition = false; + } + } + + /** + * @param msg Message. + * @param topVer Current topology version. + * @return Exchange action. + * @throws IgniteCheckedException If configuration validation failed. + */ + ExchangeActions onStateChangeRequest(ChangeGlobalStateMessage msg, AffinityTopologyVersion topVer) + throws IgniteCheckedException { + ExchangeActions exchangeActions = new ExchangeActions(); + + if (msg.activate()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + desc.startTopologyVersion(topVer); + + T2<CacheConfiguration, NearCacheConfiguration> locCfg = !F.isEmpty(locCfgsForActivation) ? + locCfgsForActivation.get(desc.cacheName()) : null; + + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(), + desc.cacheName(), + msg.initiatorNodeId()); + + req.startCacheConfiguration(desc.cacheConfiguration()); + req.cacheType(desc.cacheType()); + + if (locCfg != null) { + if (locCfg.get1() != null) + req.startCacheConfiguration(locCfg.get1()); + + req.nearCacheConfiguration(locCfg.get2()); + + req.locallyConfigured(true); + } + + exchangeActions.addCacheToStart(req, desc); + } + + for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values()) + exchangeActions.addCacheGroupToStart(grpDesc); + + List<StoredCacheData> storedCfgs = msg.storedCacheConfigurations(); + + if (storedCfgs != null) { + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + + IgniteUuid deplymentId = IgniteUuid.fromUuid(msg.requestId()); + + for (StoredCacheData storedCfg : storedCfgs) { + CacheConfiguration ccfg = storedCfg.config(); + + if (!registeredCaches.containsKey(ccfg.getName())) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(msg.requestId(), + ccfg.getName(), + msg.initiatorNodeId()); + + req.deploymentId(deplymentId); + req.startCacheConfiguration(ccfg); + req.cacheType(ctx.cache().cacheType(ccfg.getName())); + req.schema(new QuerySchema(storedCfg.queryEntities())); + + reqs.add(req); + } + } + + CacheChangeProcessResult res = processCacheChangeRequests(exchangeActions, reqs, topVer, true); + + if (!res.errs.isEmpty()) { + IgniteCheckedException err = new IgniteCheckedException("Failed to activate cluster."); + + for (IgniteCheckedException err0 : res.errs) + err.addSuppressed(err0); + + throw err; } } } + else { + locCfgsForActivation = new HashMap<>(); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + DynamicCacheChangeRequest req = DynamicCacheChangeRequest.stopRequest(ctx, + desc.cacheName(), + desc.sql(), + false); + + exchangeActions.addCacheToStop(req, desc); + + if (ctx.discovery().cacheClientNode(ctx.discovery().localNode(), desc.cacheName())) + locCfgsForActivation.put(desc.cacheName(), new T2<>((CacheConfiguration)null, (NearCacheConfiguration)null)); + } + + for (CacheGroupDescriptor grpDesc : registeredCacheGroups().values()) + exchangeActions.addCacheGroupToStop(grpDesc, false); + } + + return exchangeActions; } /** @@ -1053,16 +1213,20 @@ class ClusterCachesInfo { * @param clientNodeId Client node ID. */ private void processClientReconnectData(CacheClientReconnectDiscoveryData clientData, UUID clientNodeId) { - for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) { - String cacheName = cacheInfo.config().getName(); + DiscoveryDataClusterState state = ctx.state().clusterState(); + + if (state.active() && !state.transition()) { + for (CacheClientReconnectDiscoveryData.CacheInfo cacheInfo : clientData.clientCaches().values()) { + String cacheName = cacheInfo.config().getName(); - if (surviveReconnect(cacheName)) - ctx.discovery().addClientNode(cacheName, clientNodeId, false); - else { - DynamicCacheDescriptor desc = registeredCaches.get(cacheName); + if (surviveReconnect(cacheName)) + ctx.discovery().addClientNode(cacheName, clientNodeId, false); + else { + DynamicCacheDescriptor desc = registeredCaches.get(cacheName); - if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId())) - ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache()); + if (desc != null && desc.deploymentId().equals(cacheInfo.deploymentId())) + ctx.discovery().addClientNode(cacheName, clientNodeId, cacheInfo.nearCache()); + } } } } @@ -1371,6 +1535,7 @@ class ClusterCachesInfo { */ void onDisconnect() { cachesOnDisconnect = new CachesOnDisconnect( + ctx.state().clusterState(), new HashMap<>(registeredCacheGrps), new HashMap<>(registeredCaches)); @@ -1382,57 +1547,82 @@ class ClusterCachesInfo { } /** + * @param active {@code True} if reconnected to active cluster. + * @param transition {@code True} if reconnected while state transition in progress. * @return Information about stopped caches and cache groups. */ - ClusterCachesReconnectResult onReconnected() { + ClusterCachesReconnectResult onReconnected(boolean active, boolean transition) { assert disconnectedState(); Set<String> stoppedCaches = new HashSet<>(); Set<Integer> stoppedCacheGrps = new HashSet<>(); - for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) { - CacheGroupDescriptor locDesc = e.getValue(); - - CacheGroupDescriptor desc; - boolean stopped = true; + if (!active) { + joinOnTransition = transition; - if (locDesc.sharedGroup()) { - desc = cacheGroupByName(locDesc.groupName()); + if (F.isEmpty(locCfgsForActivation)) { + locCfgsForActivation = new HashMap<>(); - if (desc != null && desc.deploymentId().equals(locDesc.deploymentId())) - stopped = false; + for (IgniteInternalCache cache : ctx.cache().caches()) { + locCfgsForActivation.put(cache.name(), + new T2<>((CacheConfiguration)null, cache.configuration().getNearConfiguration())); + } } - else { - desc = nonSharedCacheGroupByCacheName(locDesc.config().getName()); - if (desc != null && - (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId()))) - stopped = false; - } + for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) + stoppedCacheGrps.add(e.getValue().groupId()); - if (stopped) - stoppedCacheGrps.add(locDesc.groupId()); - else - assert locDesc.groupId() == desc.groupId(); + for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) + stoppedCaches.add(e.getKey()); } + else { + for (Map.Entry<Integer, CacheGroupDescriptor> e : cachesOnDisconnect.cacheGrps.entrySet()) { + CacheGroupDescriptor locDesc = e.getValue(); - for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) { - DynamicCacheDescriptor desc = e.getValue(); + CacheGroupDescriptor desc; + boolean stopped = true; - String cacheName = e.getKey(); + if (locDesc.sharedGroup()) { + desc = cacheGroupByName(locDesc.groupName()); - boolean stopped; + if (desc != null && desc.deploymentId().equals(locDesc.deploymentId())) + stopped = false; + } + else { + desc = nonSharedCacheGroupByCacheName(locDesc.config().getName()); - if (!surviveReconnect(cacheName) || !ctx.state().active()) { - DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName); + if (desc != null && + (surviveReconnect(locDesc.config().getName()) || desc.deploymentId().equals(locDesc.deploymentId()))) + stopped = false; + } - stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId()); + if (stopped) + stoppedCacheGrps.add(locDesc.groupId()); + else + assert locDesc.groupId() == desc.groupId(); } - else - stopped = false; - if (stopped) - stoppedCaches.add(cacheName); + for (Map.Entry<String, DynamicCacheDescriptor> e : cachesOnDisconnect.caches.entrySet()) { + DynamicCacheDescriptor desc = e.getValue(); + + String cacheName = e.getKey(); + + boolean stopped; + + if (!surviveReconnect(cacheName)) { + DynamicCacheDescriptor newDesc = registeredCaches.get(cacheName); + + stopped = newDesc == null || !desc.deploymentId().equals(newDesc.deploymentId()); + } + else + stopped = false; + + if (stopped) + stoppedCaches.add(cacheName); + } + + if (!cachesOnDisconnect.clusterActive()) + initStartCachesForLocalJoin(false); } if (clientReconnectReqs != null) { @@ -1450,7 +1640,7 @@ class ClusterCachesInfo { /** * @return {@code True} if client node is currently in disconnected state. */ - public boolean disconnectedState() { + private boolean disconnectedState() { return cachesOnDisconnect != null; } @@ -1465,27 +1655,23 @@ class ClusterCachesInfo { /** * */ - void clearCaches() { - registeredCacheGrps.clear(); - - registeredCaches.clear(); - } - - /** - * - */ private static class GridData { /** */ + private final CacheJoinNodeDiscoveryData joinDiscoData; + + /** */ private final CacheNodeCommonDiscoveryData gridData; /** */ private final String conflictErr; /** + * @param joinDiscoData Discovery data collected for local node join. * @param gridData Grid data. * @param conflictErr Cache configuration conflict error. */ - GridData(CacheNodeCommonDiscoveryData gridData, String conflictErr) { + GridData(CacheJoinNodeDiscoveryData joinDiscoData, CacheNodeCommonDiscoveryData gridData, String conflictErr) { + this.joinDiscoData = joinDiscoData; this.gridData = gridData; this.conflictErr = conflictErr; } @@ -1496,18 +1682,46 @@ class ClusterCachesInfo { */ private static class CachesOnDisconnect { /** */ + final DiscoveryDataClusterState state; + + /** */ final Map<Integer, CacheGroupDescriptor> cacheGrps; /** */ final Map<String, DynamicCacheDescriptor> caches; /** + * @param state Cluster state. * @param cacheGrps Cache groups. * @param caches Caches. */ - CachesOnDisconnect(Map<Integer, CacheGroupDescriptor> cacheGrps, Map<String, DynamicCacheDescriptor> caches) { + CachesOnDisconnect(DiscoveryDataClusterState state, + Map<Integer, CacheGroupDescriptor> cacheGrps, + Map<String, DynamicCacheDescriptor> caches) { + this.state = state; this.cacheGrps = cacheGrps; this.caches = caches; } + + /** + * @return {@code True} if cluster was in active state. + */ + boolean clusterActive() { + return state.active() && !state.transition(); + } + } + + /** + * + */ + private static class CacheChangeProcessResult { + /** */ + private boolean needExchange; + + /** */ + private final List<DynamicCacheDescriptor> addedDescs = new ArrayList<>(); + + /** */ + private final List<IgniteCheckedException> errs = new ArrayList<>(); } }
