Repository: ignite
Updated Branches:
  refs/heads/ignite-1741 [created] 37fedb351


ignite-1741


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37fedb35
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37fedb35
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37fedb35

Branch: refs/heads/ignite-1741
Commit: 37fedb3510a1a1a0024607d2c4a70a2c34439837
Parents: 388f7ff
Author: sboikov <[email protected]>
Authored: Wed Dec 19 00:12:21 2018 +0300
Committer: sboikov <[email protected]>
Committed: Wed Dec 19 00:12:21 2018 +0300

----------------------------------------------------------------------
 .../internal/managers/discovery/DiscoCache.java |  8 ++
 .../discovery/GridDiscoveryManager.java         | 22 ++++-
 .../affinity/GridAffinityProcessor.java         | 96 +++++++++-----------
 .../cache/CacheAffinityCallSelfTest.java        |  4 +-
 4 files changed, 70 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index e99e478..2d6eeb0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -328,6 +328,14 @@ public class DiscoCache {
     }
 
     /**
+     * @param cacheName Cache name.
+     * @return {@code True} if cache with given name exists.
+     */
+    public boolean cacheStarted(String cacheName) {
+        return allCacheNodes.containsKey(CU.cacheId(cacheName));
+    }
+
+    /**
      * Gets all nodes that have cache with given name.
      *
      * @param cacheName Cache name.

http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index bbe0c78..ba8861e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2465,7 +2465,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
      * @param cacheName Cache name.
      * @param rich Node to add
      */
-    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String 
cacheName, ClusterNode rich) {
+    private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String 
cacheName, ClusterNode node) {
         List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName));
 
         if (cacheNodes == null) {
@@ -2474,7 +2474,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             cacheMap.put(CU.cacheId(cacheName), cacheNodes);
         }
 
-        cacheNodes.add(rich);
+        cacheNodes.add(node);
     }
 
     /**
@@ -3420,6 +3420,24 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 }
             }
         }
+
+        if (cacheGrpAffNodes.size() != registeredCacheGrps.size()) {
+            for (Map.Entry<Integer, CacheGroupAffinity> e : 
registeredCacheGrps.entrySet()) {
+                Integer grpId = e.getKey();
+
+                if (!cacheGrpAffNodes.containsKey(grpId))
+                    cacheGrpAffNodes.put(grpId, Collections.emptyList());
+            }
+        }
+
+        if (allCacheNodes.size() != registeredCaches.size()) {
+            for (Map.Entry<String, CachePredicate> entry : 
registeredCaches.entrySet()) {
+                Integer cacheId = CU.cacheId(entry.getKey());
+
+                if (!allCacheNodes.containsKey(cacheId))
+                    allCacheNodes.put(cacheId, Collections.emptyList());
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 30bf113..cddd3c1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -31,21 +31,25 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -80,12 +84,6 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
     /** Affinity map cleanup delay (ms). */
     private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000;
 
-    /** Retries to get affinity in case of error. */
-    private static final int ERROR_RETRIES = 3;
-
-    /** Time to wait between errors (in milliseconds). */
-    private static final long ERROR_WAIT = 500;
-
     /** Log. */
     private final IgniteLogger log;
 
@@ -442,86 +440,74 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             }
         }
 
-        Collection<ClusterNode> cacheNodes = 
ctx.discovery().cacheNodes(cacheName, topVer);
-
-        if (F.isEmpty(cacheNodes))
-            return null;
-
-        GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
-
-        IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
+        DiscoCache discoCache = ctx.discovery().discoCache(topVer);
 
-        if (old != null)
-            return old.get();
+        if (discoCache == null)
+            throw new IgniteCheckedException("Failed to resolve cluster 
topology: " + topVer);
 
-        int max = ERROR_RETRIES;
-        int cnt = 0;
+        if (!discoCache.cacheStarted(cacheName))
+            return null;
 
-        Iterator<ClusterNode> it = cacheNodes.iterator();
+        DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
 
-        // We are here because affinity has not been fetched yet, or cache 
mode is LOCAL.
-        while (true) {
-            cnt++;
+        if (desc == null) {
+            if (ctx.clientDisconnected())
+                throw new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                        "Failed to get affinity mapping, client 
disconnected.");
 
-            if (!it.hasNext())
-                it = cacheNodes.iterator();
+            return null;
+        }
 
-            // Double check since we deal with dynamic view.
-            if (!it.hasNext())
-                // Exception will be caught in this method.
-                throw new IgniteCheckedException("No cache nodes in topology 
for cache name: " + cacheName);
+        if (desc.cacheConfiguration().getCacheMode() == LOCAL)
+            throw new IgniteCheckedException("Failed to map keys for LOCAL 
cache: " + cacheName);
 
-            ClusterNode n = it.next();
+        GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
 
-            CacheMode mode = ctx.cache().cacheMode(cacheName);
+        IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
 
-            if (mode == null) {
-                if (ctx.clientDisconnected())
-                    throw new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                            "Failed to get affinity mapping, client 
disconnected.");
+        if (old != null)
+            return old.get();
 
-                throw new IgniteCheckedException("No cache nodes in topology 
for cache name: " + cacheName);
-            }
+        List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName);
 
-            // Map all keys to a single node, if the cache mode is LOCAL.
-            if (mode == LOCAL) {
-                fut0.onDone(new IgniteCheckedException("Failed to map keys for 
LOCAL cache."));
+        for (int i = 0; i < cacheNodes.size(); i++) {
+            ClusterNode node = cacheNodes.get(i);
 
-                // Will throw exception.
-                fut0.get();
-            }
+            if (!discoCache.alive(node.id()))
+                continue;
 
             try {
-                // Resolve cache context for remote node.
-                // Set affinity function before counting down on latch.
-                fut0.onDone(affinityInfoFromNode(cacheName, topVer, n));
+                fut0.onDone(affinityInfoFromNode(cacheName, topVer, node));
 
                 break;
             }
             catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get affinity from node (will retry) 
[cache=" + cacheName +
-                        ", node=" + U.toShortString(n) + ", msg=" + 
e.getMessage() + ']');
-
-                if (cnt < max) {
-                    U.sleep(ERROR_WAIT);
+                if (e instanceof ClusterTopologyCheckedException || 
X.hasCause(e, ClusterTopologyException.class)) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to get affinity from node, node 
failed [cache=" + cacheName +
+                                ", node=" + node.id() + ", msg=" + 
e.getMessage() + ']');
 
                     continue;
                 }
 
-                affMap.remove(key, fut0);
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get affinity from node [cache=" + 
cacheName +
+                        ", node=" + node.id() + ", msg=" + e.getMessage() + 
']');
 
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + n, e));
+                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + node.id(), e));
 
                 break;
             }
             catch (RuntimeException | Error e) {
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + n, e));
+                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + node.id(), e));
 
                 break;
             }
         }
 
+        if (!fut0.isDone())
+            fut0.onDone(new ClusterGroupEmptyCheckedException("Failed to get 
cache affinity, all cache nodes failed: " + cacheName));
+
         return fut0.get();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index baaf503..f1d4740 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -154,8 +154,6 @@ public class CacheAffinityCallSelfTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testAffinityCallNoServerNode() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1741";);
-
         startGridsMultiThreaded(SRVS + 1);
 
         final Integer key = 1;
@@ -165,7 +163,7 @@ public class CacheAffinityCallSelfTest extends 
GridCommonAbstractTest {
         assertTrue(client.configuration().isClientMode());
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
-            @Override public Object call() throws Exception {
+            @Override public Object call() {
                 for (int i = 0; i < SRVS; ++i)
                     stopGrid(i, false);
 

Reply via email to