ignite-gg-12221 Reconnecting a previously active client to an inactive cluster 
+ tests


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5c4d2a77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5c4d2a77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5c4d2a77

Branch: refs/heads/ignite-5398
Commit: 5c4d2a7729a2ea8b5e3f7b24a3a7e1f537b6b4da
Parents: 918cd3e
Author: Dmitriy Govorukhin <[email protected]>
Authored: Tue May 30 19:51:07 2017 +0300
Committer: Dmitriy Govorukhin <[email protected]>
Committed: Tue May 30 19:51:07 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  5 +-
 .../cache/DynamicCacheChangeBatch.java          |  4 +-
 .../processors/cache/GridCacheProcessor.java    | 55 +++++++++++++++++---
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  4 +-
 .../testframework/junits/GridAbstractTest.java  |  7 +++
 5 files changed, 64 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8703e29..f7a82ba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1065,7 +1065,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             locMarshStrSerVer2;
 
         boolean locDelayAssign = 
locNode.attribute(ATTR_LATE_AFFINITY_ASSIGNMENT);
-        boolean locActiveOnStart = locNode.attribute(ATTR_ACTIVE_ON_START);
+
 
         Boolean locSrvcCompatibilityEnabled = 
locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
 
@@ -1158,6 +1158,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     ", rmtAddrs=" + U.addressesAsString(n) + ']');
             }
 
+           /* boolean locActiveOnStart = 
locNode.attribute(ATTR_ACTIVE_ON_START);
             boolean rmtActiveOnStart = n.attribute(ATTR_ACTIVE_ON_START);
 
             if (locActiveOnStart != rmtActiveOnStart) {
@@ -1167,7 +1168,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     ", rmtId8=" + U.id8(n.id()) +
                     ", rmtActiveOnStart=" + rmtActiveOnStart +
                     ", rmtAddrs=" + U.addressesAsString(n) + ']');
-            }
+            }*/
 
             if 
(n.version().compareToIgnoreTimestamp(GridServiceProcessor.LAZY_SERVICES_CFG_SINCE)
 >= 0) {
                 Boolean rmtSrvcCompatibilityEnabled = 
n.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index 7804240..12b7e12 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -133,8 +133,10 @@ public class DynamicCacheChangeBatch implements 
DiscoveryCustomMessage {
     /**
      * @param caches restarting caches.
      */
-    public void restartingCaches(Set<String> caches) {
+    public DynamicCacheChangeBatch restartingCaches(Set<String> caches) {
         this.restartingCaches = caches;
+
+        return this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 96912dc..1df7e42 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -196,6 +196,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Cache templates. */
     private ConcurrentMap<String, DynamicCacheDescriptor> registeredTemplates 
= new ConcurrentHashMap<>();
 
+    /** On join batches. */
+    private ConcurrentMap<UUID, DynamicCacheChangeBatch> onJoinBatches = new 
ConcurrentHashMap<>();
+
     /** */
     private IdentityHashMap<CacheStore, ThreadLocal> sesHolders = new 
IdentityHashMap<>();
 
@@ -774,9 +777,16 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
             boolean currStatus = ctx.state().active();
 
+            boolean changed = false;
+
+            if (currStatus != activeOnStart) {
+                activeOnStart = currStatus;
+
+                changed = true;
+            }
             // If we start as inactive node, and join to active cluster, we 
must register all caches
             // which were received on join.
-            if (!ctx.isDaemon() && currStatus && !activeOnStart) {
+            if (!ctx.isDaemon() && changed) {
                 List<CacheConfiguration> tmpCacheCfg = new ArrayList<>();
 
                 for (CacheConfiguration conf : 
ctx.config().getCacheConfiguration()) {
@@ -788,6 +798,9 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                             ((desc.receivedOnDiscovery() && 
CU.affinityNode(locNode, filter)) ||
                                 CU.isSystemCache(c.getName()))) {
 
+                            if (CU.isSystemCache(c.getName()))
+                                desc.locallyConfigured(true);
+
                             tmpCacheCfg.add(c);
 
                             break;
@@ -825,8 +838,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
                 boolean loc = desc.locallyConfigured();
 
-                if (loc || (desc.receivedOnDiscovery() && 
(CU.affinityNode(locNode, filter) ||
-                    startAllCachesOnClientStart()))) {
+                if (loc || (desc.receivedOnDiscovery() && 
(CU.affinityNode(locNode, filter) || startAllCachesOnClientStart()))) {
                     boolean started = desc.onStart();
 
                     assert started : "Failed to change started flag for 
locally configured cache: " + desc;
@@ -1114,17 +1126,19 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             else
                 stopped = false;
 
-            if (stopped) {
+            if (true) {
                 cache.context().gate().reconnected(true);
 
                 sharedCtx.removeCacheContext(cache.ctx);
 
                 caches.remove(maskNull(cache.name()));
+
                 jCacheProxies.remove(maskNull(cache.name()));
 
                 IgniteInternalFuture<?> fut = ctx.closure().runLocalSafe(new 
Runnable() {
                     @Override public void run() {
                         onKernalStop(cache, true);
+
                         stopCache(cache, true, false);
                     }
                 });
@@ -2091,6 +2105,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
+        if (!sharedCtx.kernalContext().state().active())
+            return new 
DynamicCacheChangeBatch(Collections.<DynamicCacheChangeRequest>emptyList())
+                .restartingCaches(Collections.<String>emptySet());
+
         boolean reconnect = ctx.localNodeId().equals(nodeId) && 
cachesOnDisconnect != null;
 
         // Collect dynamically started caches to a single object.
@@ -2196,6 +2214,23 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID 
rmtNodeId, Serializable data) {
+        if (!ctx.state().active()) {
+            if (!ctx.localNodeId().equals(joiningNodeId)){
+                if (data instanceof DynamicCacheChangeBatch)
+                    onJoinBatches.put(rmtNodeId, 
(DynamicCacheChangeBatch)data);
+
+                return;
+            }else {
+                registeredCaches.clear();
+                registeredTemplates.clear();
+
+                for (DynamicCacheDescriptor desc : registeredCaches.values())
+                    
ctx.discovery().removeCacheFilter(desc.cacheConfiguration().getName());
+
+                registeredCaches.clear();
+            }
+        }
+
         if (data instanceof DynamicCacheChangeBatch) {
             DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data;
 
@@ -2728,6 +2763,14 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     public Collection<DynamicCacheChangeRequest> startAllCachesRequests() 
throws IgniteCheckedException {
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
+        List<CacheConfiguration> cfgs = new ArrayList();
+
+        Collections.addAll(cfgs, ctx.config().getCacheConfiguration());
+
+        for (DynamicCacheChangeBatch batch : onJoinBatches.values())
+            for (DynamicCacheChangeRequest req : batch.requests())
+                cfgs.add(req.startCacheConfiguration());
+
         if (!ctx.config().isDaemon() &&
             sharedCtx.pageStore() != null &&
             sharedCtx.database().persistenceEnabled()) {
@@ -2740,13 +2783,13 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                     reqs.add(createRequest(cfg, false));
             }
 
-            for (CacheConfiguration cfg : 
ctx.config().getCacheConfiguration()) {
+            for (CacheConfiguration cfg : cfgs) {
                 if (!savedCacheNames.contains(cfg.getName()))
                     reqs.add(createRequest(cfg, true));
             }
         }
         else {
-            for (CacheConfiguration cfg : ctx.config().getCacheConfiguration())
+            for (CacheConfiguration cfg : cfgs)
                 reqs.add(createRequest(cfg, true));
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 742227e..14746a7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -3688,7 +3688,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     return;
                 }
 
-                boolean locActiveOnStart = booleanAttribute(locNode, 
ATTR_ACTIVE_ON_START, true);
+               /* boolean locActiveOnStart = booleanAttribute(locNode, 
ATTR_ACTIVE_ON_START, true);
                 boolean rmtActiveOnStart = booleanAttribute(node, 
ATTR_ACTIVE_ON_START, true);
 
                 if (locActiveOnStart != rmtActiveOnStart) {
@@ -3712,7 +3712,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     // Ignore join request.
                     return;
-                }
+                }*/
 
                 final Boolean locSrvcCompatibilityEnabled = 
locNode.attribute(ATTR_SERVICES_COMPATIBILITY_MODE);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5c4d2a77/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 95df9df..1711767 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1084,6 +1084,13 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param cfg Config.
+     */
+    protected Ignite startGrid(IgniteConfiguration cfg) throws Exception {
+        return startGrid(cfg.getGridName(), cfg);
+    }
+
+    /**
      * Loads configuration from the given Spring XML file.
      *
      * @param springCfgPath Path to file.

Reply via email to