Repository: ignite Updated Branches: refs/heads/ignite-1741 [created] 37fedb351
ignite-1741 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/37fedb35 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/37fedb35 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/37fedb35 Branch: refs/heads/ignite-1741 Commit: 37fedb3510a1a1a0024607d2c4a70a2c34439837 Parents: 388f7ff Author: sboikov <[email protected]> Authored: Wed Dec 19 00:12:21 2018 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 19 00:12:21 2018 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 8 ++ .../discovery/GridDiscoveryManager.java | 22 ++++- .../affinity/GridAffinityProcessor.java | 96 +++++++++----------- .../cache/CacheAffinityCallSelfTest.java | 4 +- 4 files changed, 70 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index e99e478..2d6eeb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -328,6 +328,14 @@ public class DiscoCache { } /** + * @param cacheName Cache name. + * @return {@code True} if cache with given name exists. + */ + public boolean cacheStarted(String cacheName) { + return allCacheNodes.containsKey(CU.cacheId(cacheName)); + } + + /** * Gets all nodes that have cache with given name. * * @param cacheName Cache name. http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index bbe0c78..ba8861e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2465,7 +2465,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. * @param rich Node to add */ - private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + private void addToMap(Map<Integer, List<ClusterNode>> cacheMap, String cacheName, ClusterNode node) { List<ClusterNode> cacheNodes = cacheMap.get(CU.cacheId(cacheName)); if (cacheNodes == null) { @@ -2474,7 +2474,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { cacheMap.put(CU.cacheId(cacheName), cacheNodes); } - cacheNodes.add(rich); + cacheNodes.add(node); } /** @@ -3420,6 +3420,24 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } } + + if (cacheGrpAffNodes.size() != registeredCacheGrps.size()) { + for (Map.Entry<Integer, CacheGroupAffinity> e : registeredCacheGrps.entrySet()) { + Integer grpId = e.getKey(); + + if (!cacheGrpAffNodes.containsKey(grpId)) + cacheGrpAffNodes.put(grpId, Collections.emptyList()); + } + } + + if (allCacheNodes.size() != registeredCaches.size()) { + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + Integer cacheId = CU.cacheId(entry.getKey()); + + if (!allCacheNodes.containsKey(cacheId)) + allCacheNodes.put(cacheId, Collections.emptyList()); + } + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 30bf113..cddd3c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -31,21 +31,25 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -80,12 +84,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** Affinity map cleanup delay (ms). */ private static final long AFFINITY_MAP_CLEAN_UP_DELAY = 3000; - /** Retries to get affinity in case of error. */ - private static final int ERROR_RETRIES = 3; - - /** Time to wait between errors (in milliseconds). */ - private static final long ERROR_WAIT = 500; - /** Log. */ private final IgniteLogger log; @@ -442,86 +440,74 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } } - Collection<ClusterNode> cacheNodes = ctx.discovery().cacheNodes(cacheName, topVer); - - if (F.isEmpty(cacheNodes)) - return null; - - GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); - - IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); + DiscoCache discoCache = ctx.discovery().discoCache(topVer); - if (old != null) - return old.get(); + if (discoCache == null) + throw new IgniteCheckedException("Failed to resolve cluster topology: " + topVer); - int max = ERROR_RETRIES; - int cnt = 0; + if (!discoCache.cacheStarted(cacheName)) + return null; - Iterator<ClusterNode> it = cacheNodes.iterator(); + DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName); - // We are here because affinity has not been fetched yet, or cache mode is LOCAL. - while (true) { - cnt++; + if (desc == null) { + if (ctx.clientDisconnected()) + throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to get affinity mapping, client disconnected."); - if (!it.hasNext()) - it = cacheNodes.iterator(); + return null; + } - // Double check since we deal with dynamic view. - if (!it.hasNext()) - // Exception will be caught in this method. - throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); + if (desc.cacheConfiguration().getCacheMode() == LOCAL) + throw new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName); - ClusterNode n = it.next(); + GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); - CacheMode mode = ctx.cache().cacheMode(cacheName); + IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); - if (mode == null) { - if (ctx.clientDisconnected()) - throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to get affinity mapping, client disconnected."); + if (old != null) + return old.get(); - throw new IgniteCheckedException("No cache nodes in topology for cache name: " + cacheName); - } + List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName); - // Map all keys to a single node, if the cache mode is LOCAL. - if (mode == LOCAL) { - fut0.onDone(new IgniteCheckedException("Failed to map keys for LOCAL cache.")); + for (int i = 0; i < cacheNodes.size(); i++) { + ClusterNode node = cacheNodes.get(i); - // Will throw exception. - fut0.get(); - } + if (!discoCache.alive(node.id())) + continue; try { - // Resolve cache context for remote node. - // Set affinity function before counting down on latch. - fut0.onDone(affinityInfoFromNode(cacheName, topVer, n)); + fut0.onDone(affinityInfoFromNode(cacheName, topVer, node)); break; } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to get affinity from node (will retry) [cache=" + cacheName + - ", node=" + U.toShortString(n) + ", msg=" + e.getMessage() + ']'); - - if (cnt < max) { - U.sleep(ERROR_WAIT); + if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node, node failed [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); continue; } - affMap.remove(key, fut0); + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); + fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); break; } catch (RuntimeException | Error e) { - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + n, e)); + fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); break; } } + if (!fut0.isDone()) + fut0.onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName)); + return fut0.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/37fedb35/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index baaf503..f1d4740 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -154,8 +154,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { */ @Test public void testAffinityCallNoServerNode() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1741"); - startGridsMultiThreaded(SRVS + 1); final Integer key = 1; @@ -165,7 +163,7 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { assertTrue(client.configuration().isClientMode()); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { + @Override public Object call() { for (int i = 0; i < SRVS; ++i) stopGrid(i, false);
