ignite-gg-12221 Reconnecting a previously active client to an inactive cluster + tests
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c4d2a77 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c4d2a77 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c4d2a77 Branch: refs/heads/ignite-5398 Commit: 5c4d2a7729a2ea8b5e3f7b24a3a7e1f537b6b4da Parents: 918cd3e Author: Dmitriy Govorukhin <[email protected]> Authored: Tue May 30 19:51:07 2017 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Tue May 30 19:51:07 2017 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 5 +- .../cache/DynamicCacheChangeBatch.java | 4 +- .../processors/cache/GridCacheProcessor.java | 55 +++++++++++++++++--- .../ignite/spi/discovery/tcp/ServerImpl.java | 4 +- .../testframework/junits/GridAbstractTest.java | 7 +++ 5 files changed, 64 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 8703e29..f7a82ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -1065,7 +1065,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { locMarshStrSerVer2; boolean locDelayAssign = locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT); - boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START); + Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); @@ -1158,6 +1158,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", rmtAddrs=" + U.addressesAsString(n) + ']'); } + /* boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START); boolean rmtActiveOnStart = n.attribute(ATTR_ACTIVE_ON_START); if (locActiveOnStart != rmtActiveOnStart) { @@ -1167,7 +1168,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ", rmtId8=" + U.id8(n.id()) + ", rmtActiveOnStart=" + rmtActiveOnStart + ", rmtAddrs=" + U.addressesAsString(n) + ']'); - } + }*/ if (n.version().compareToIgnoreTimestamp(GridServiceProcessor.LAZY_SERVICES_CFG_SINCE) >= 0) { Boolean rmtSrvcCompatibilityEnabled = n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 7804240..12b7e12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -133,8 +133,10 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** * @param caches restarting caches. */ - public void restartingCaches(Set<String> caches) { + public DynamicCacheChangeBatch restartingCaches(Set<String> caches) { this.restartingCaches = caches; + + return this; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 96912dc..1df7e42 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -196,6 +196,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Cache templates. */ private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates = new ConcurrentHashMap<>(); + /** On join batches. */ + private ConcurrentMap<UUID, DynamicCacheChangeBatch> onJoinBatches = new ConcurrentHashMap<>(); + /** */ private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new IdentityHashMap<>(); @@ -774,9 +777,16 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean currStatus = ctx.state().active(); + boolean changed = false; + + if (currStatus != activeOnStart) { + activeOnStart = currStatus; + + changed = true; + } // If we start as inactive node, and join to active cluster, we must register all caches // which were received on join. - if (!ctx.isDaemon() && currStatus && !activeOnStart) { + if (!ctx.isDaemon() && changed) { List<CacheConfiguration> tmpCacheCfg = new ArrayList<>(); for (CacheConfiguration conf : ctx.config().getCacheConfiguration()) { @@ -788,6 +798,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { ((desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter)) || CU.isSystemCache(c.getName()))) { + if (CU.isSystemCache(c.getName())) + desc.locallyConfigured(true); + tmpCacheCfg.add(c); break; @@ -825,8 +838,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); - if (loc || (desc.receivedOnDiscovery() && (CU.affinityNode(locNode, filter) || - startAllCachesOnClientStart()))) { + if (loc || (desc.receivedOnDiscovery() && (CU.affinityNode(locNode, filter) || startAllCachesOnClientStart()))) { boolean started = desc.onStart(); assert started : "Failed to change started flag for locally configured cache: " + desc; @@ -1114,17 +1126,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { else stopped = false; - if (stopped) { + if (true) { cache.context().gate().reconnected(true); sharedCtx.removeCacheContext(cache.ctx); caches.remove(maskNull(cache.name())); + jCacheProxies.remove(maskNull(cache.name())); IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new Runnable() { @Override public void run() { onKernalStop(cache, true); + stopCache(cache, true, false); } }); @@ -2091,6 +2105,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { + if (!sharedCtx.kernalContext().state().active()) + return new DynamicCacheChangeBatch(Collections.<DynamicCacheChangeRequest>emptyList()) + .restartingCaches(Collections.<String>emptySet()); + boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; // Collect dynamically started caches to a single object. @@ -2196,6 +2214,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable data) { + if (!ctx.state().active()) { + if (!ctx.localNodeId().equals(joiningNodeId)){ + if (data instanceof DynamicCacheChangeBatch) + onJoinBatches.put(rmtNodeId, (DynamicCacheChangeBatch)data); + + return; + }else { + registeredCaches.clear(); + registeredTemplates.clear(); + + for (DynamicCacheDescriptor desc : registeredCaches.values()) + ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName()); + + registeredCaches.clear(); + } + } + if (data instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; @@ -2728,6 +2763,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { public Collection<DynamicCacheChangeRequest> startAllCachesRequests() throws IgniteCheckedException { List<DynamicCacheChangeRequest> reqs = new ArrayList<>(); + List<CacheConfiguration> cfgs = new ArrayList(); + + Collections.addAll(cfgs, ctx.config().getCacheConfiguration()); + + for (DynamicCacheChangeBatch batch : onJoinBatches.values()) + for (DynamicCacheChangeRequest req : batch.requests()) + cfgs.add(req.startCacheConfiguration()); + if (!ctx.config().isDaemon() && sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled()) { @@ -2740,13 +2783,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { reqs.add(createRequest(cfg, false)); } - for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + for (CacheConfiguration cfg : cfgs) { if (!savedCacheNames.contains(cfg.getName())) reqs.add(createRequest(cfg, true)); } } else { - for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) + for (CacheConfiguration cfg : cfgs) reqs.add(createRequest(cfg, true)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 742227e..14746a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3688,7 +3688,7 @@ class ServerImpl extends TcpDiscoveryImpl { return; } - boolean locActiveOnStart = booleanAttribute(locNode, ATTR_ACTIVE_ON_START, true); + /* boolean locActiveOnStart = booleanAttribute(locNode, ATTR_ACTIVE_ON_START, true); boolean rmtActiveOnStart = booleanAttribute(node, ATTR_ACTIVE_ON_START, true); if (locActiveOnStart != rmtActiveOnStart) { @@ -3712,7 +3712,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Ignore join request. return; - } + }*/ final Boolean locSrvcCompatibilityEnabled = locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE); http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 95df9df..1711767 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1084,6 +1084,13 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @param cfg Config. + */ + protected Ignite startGrid(IgniteConfiguration cfg) throws Exception { + return startGrid(cfg.getGridName(), cfg); + } + + /** * Loads configuration from the given Spring XML file. * * @param springCfgPath Path to file.
