ignite-gg-12221 more test, collect receive active/inactive flag first.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8100f52b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8100f52b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8100f52b Branch: refs/heads/ignite-5398 Commit: 8100f52b4ee4e2dd783e2d9c000ec8a6f656a7b8 Parents: 5c4d2a7 Author: Dmitriy Govorukhin <[email protected]> Authored: Wed May 31 14:20:56 2017 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Wed May 31 14:20:56 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 24 ++++- .../processors/cache/GridCacheProcessor.java | 107 +++++++++++-------- .../cluster/GridClusterStateProcessor.java | 36 ++++++- 3 files changed, 117 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index f7a82ba..81e5cc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.service.GridServiceProcessor; @@ -88,7 +89,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -662,7 +662,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Map<Integer, Serializable> data = new HashMap<>(); + Serializable val = ctx.state().collectDiscoveryData(nodeId); + + int type = ctx.state().discoveryDataType().ordinal(); + + assert val != null; + + data.put(type, val); + for (GridComponent comp : ctx.components()) { + if (comp instanceof GridClusterStateProcessor) + continue; + Serializable compData = comp.collectDiscoveryData(nodeId); if (compData != null) { @@ -676,6 +687,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) { + GridClusterStateProcessor stateProc = ctx.state(); + + int type = stateProc.discoveryDataType().ordinal(); + + Serializable data0 = data.get(type); + + if (data0 != null) + stateProc.onDiscoveryDataReceived(joiningNodeId, nodeId, data0); + for (Map.Entry<Integer, Serializable> e : data.entrySet()) { GridComponent comp = null; @@ -687,7 +707,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - if (comp != null) + if (comp != null && !(comp instanceof GridClusterStateProcessor)) comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue()); else { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 1df7e42..fa9353e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -600,7 +600,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheSharedManager mgr : sharedCtx.managers()) mgr.start(sharedCtx); - //if inActivate on start then skip registrate caches + // If inActivate on start then skip registrate caches. if (!activeOnStart) return; @@ -775,48 +775,45 @@ public class GridCacheProcessor extends GridProcessorAdapter { try { checkConsistency(); - boolean currStatus = ctx.state().active(); + if (ctx.state().onJoinChanged()){ + // Node was active on start and joint -> inActive cluster. + if (ctx.state().changeActiveToInactive()){ - boolean changed = false; - - if (currStatus != activeOnStart) { - activeOnStart = currStatus; + } - changed = true; - } - // If we start as inactive node, and join to active cluster, we must register all caches - // which were received on join. - if (!ctx.isDaemon() && changed) { - List<CacheConfiguration> tmpCacheCfg = new ArrayList<>(); + // Node was inActive on start and joint -> active cluster. + if (ctx.state().changeInActiveToActive()){ + if (!ctx.isDaemon()){ + List<CacheConfiguration> tmpCacheCfg = new ArrayList<>(); - for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) { - for (DynamicCacheDescriptor desc : registeredCaches.values()) { - CacheConfiguration c = desc.cacheConfiguration(); - IgnitePredicate filter = c.getNodeFilter(); + for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + CacheConfiguration c = desc.cacheConfiguration(); + IgnitePredicate filter = c.getNodeFilter(); - if (c.getName().equals(conf.getName()) && - ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || - CU.isSystemCache(c.getName()))) { + if (c.getName().equals(conf.getName()) && + ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || + CU.isSystemCache(c.getName()))) { - if (CU.isSystemCache(c.getName())) - desc.locallyConfigured(true); + if (CU.isSystemCache(c.getName())) + desc.locallyConfigured(true); - tmpCacheCfg.add(c); + tmpCacheCfg.add(c); - break; + break; + } + } } - } - } - if (!tmpCacheCfg.isEmpty()) { - CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()]; + if (!tmpCacheCfg.isEmpty()) { + CacheConfiguration[] newCacheCfg = new CacheConfiguration[tmpCacheCfg.size()]; - tmpCacheCfg.toArray(newCacheCfg); + tmpCacheCfg.toArray(newCacheCfg); - ctx.config().setCacheConfiguration(newCacheCfg); + ctx.config().setCacheConfiguration(newCacheCfg); + } + } } - - activeOnStart = currStatus; } if (activeOnStart && !ctx.clientNode() && !ctx.isDaemon()) @@ -876,7 +873,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.onKernalStart(false); // Escape if start active on start false - if (!activeOnStart) + if (!ctx.state().active()) return; for (GridCacheAdapter<?, ?> cache : caches.values()) @@ -2105,9 +2102,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { - if (!sharedCtx.kernalContext().state().active()) - return new DynamicCacheChangeBatch(Collections.<DynamicCacheChangeRequest>emptyList()) - .restartingCaches(Collections.<String>emptySet()); + if (!sharedCtx.kernalContext().state().active()) { + if (ctx.localNodeId().equals(nodeId)) { + CacheConfiguration[] cfgs = sharedCtx.gridConfig().getCacheConfiguration(); + + List<DynamicCacheChangeRequest> reqs = new ArrayList<>(cfgs.length); + + try { + for (CacheConfiguration cfg : cfgs) + reqs.add(createRequest(cfg, true)); + } + catch (Exception e) { + // Todo + } + + return new DynamicCacheChangeBatch(reqs).restartingCaches(Collections.<String>emptySet()); + } + } boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; @@ -2214,21 +2225,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { - if (!ctx.state().active()) { - if (!ctx.localNodeId().equals(joiningNodeId)){ - if (data instanceof DynamicCacheChangeBatch) - onJoinBatches.put(rmtNodeId, (DynamicCacheChangeBatch)data); + if (ctx.state().onJoinChanged()){ + if (ctx.state().changeActiveToInactive()){ + if (ctx.localNodeId().equals(joiningNodeId)) { + for (DynamicCacheDescriptor desc : registeredCaches.values()) + ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName()); + + registeredCaches.clear(); + registeredTemplates.clear(); + } return; - }else { - registeredCaches.clear(); - registeredTemplates.clear(); + } + } - for (DynamicCacheDescriptor desc : registeredCaches.values()) - ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName()); + if (!ctx.state().active()) { + if (data instanceof DynamicCacheChangeBatch) + onJoinBatches.put(rmtNodeId, (DynamicCacheChangeBatch)data); - registeredCaches.clear(); - } + return; } if (data instanceof DynamicCacheChangeBatch) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8100f52b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index a7c5cb8..3ffbda9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -130,7 +130,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { @Override public void start(boolean activeOnStart) throws IgniteCheckedException { super.start(activeOnStart); - globalState = activeOnStart ? ACTIVE : INACTIVE; + globalState = state(activeOnStart); cacheProc = ctx.cache(); sharedCtx = cacheProc.context(); @@ -215,11 +215,15 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { + assert globalState != null; + return globalState; } /** {@inheritDoc} */ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { + assert data != null; + if (ctx.localNodeId().equals(joiningNodeId)) globalState = (ClusterState)data; } @@ -313,6 +317,34 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { } /** + * @param flag Flag. + */ + private ClusterState state(boolean flag){ + return flag ? ACTIVE : INACTIVE; + } + + /** + * + */ + public boolean onJoinChanged() { + return state(sharedCtx.gridConfig().isActiveOnStart()) != globalState; + } + + /** + * + */ + public boolean changeActiveToInactive(){ + return state(sharedCtx.gridConfig().isActiveOnStart()) == ACTIVE && globalState == INACTIVE; + } + + /** + * + */ + public boolean changeInActiveToActive(){ + return state(sharedCtx.gridConfig().isActiveOnStart()) == INACTIVE && globalState == ACTIVE; + } + + /** * */ public boolean active() { @@ -406,7 +438,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter { //todo revert change if deactivate request fail } - globalState = actx.activate ? INACTIVE : ACTIVE; + globalState = state(actx.activate); GridChangeGlobalStateFuture af = cgsLocFut.get();
