Repository: ignite Updated Branches: refs/heads/ignite-1741 96860ed13 -> d87690d6d
ignite-1741 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d87690d6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d87690d6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d87690d6 Branch: refs/heads/ignite-1741 Commit: d87690d6d7f4a3bdbc4818e70d8cc3f5116bca80 Parents: 96860ed Author: sboikov <[email protected]> Authored: Wed Dec 19 13:26:46 2018 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 19 13:26:46 2018 +0300 ---------------------------------------------------------------------- .../affinity/GridAffinityProcessor.java | 169 ++++++++++++------- .../processors/task/GridTaskWorker.java | 14 +- .../cache/CacheAffinityCallSelfTest.java | 31 +++- 3 files changed, 148 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index cddd3c1..a290d83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -58,6 +58,8 @@ import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -381,17 +383,25 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return affInfo != null ? affinityMap(affInfo, keys) : Collections.<ClusterNode, Collection<K>>emptyMap(); } - /** * @param cacheName Cache name. * @param topVer Topology version. * @return Affinity cache. * @throws IgniteCheckedException In case of error. */ - @SuppressWarnings("ErrorNotRethrown") @Nullable private AffinityInfo affinityCache(final String cacheName, AffinityTopologyVersion topVer) throws IgniteCheckedException { + return affinityCacheFuture(cacheName, topVer).get(); + } + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Affinity cache. + * @throws IgniteCheckedException In case of error. + */ + public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String cacheName, AffinityTopologyVersion topVer) + throws IgniteCheckedException { assert cacheName != null; AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, topVer); @@ -399,7 +409,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { IgniteInternalFuture<AffinityInfo> fut = affMap.get(key); if (fut != null) - return fut.get(); + return fut; GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); @@ -414,7 +424,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { cctx.gate().enter(); } catch (IllegalStateException ignored) { - return null; + return new GridFinishedFuture<>(null); } try { @@ -428,12 +438,14 @@ public class GridAffinityProcessor extends GridProcessorAdapter { assign, cctx.cacheObjectContext()); - IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, new GridFinishedFuture<>(info)); + GridFinishedFuture<AffinityInfo> fut0 = new GridFinishedFuture<>(info); + + IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); if (old != null) - info = old.get(); + return old; - return info; + return fut0; } finally { cctx.gate().leave(); @@ -443,72 +455,107 @@ public class GridAffinityProcessor extends GridProcessorAdapter { DiscoCache discoCache = ctx.discovery().discoCache(topVer); if (discoCache == null) - throw new IgniteCheckedException("Failed to resolve cluster topology: " + topVer); + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to resolve cluster topology: " + topVer)); if (!discoCache.cacheStarted(cacheName)) - return null; + return new GridFinishedFuture<>(null); + + List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName); DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName); - if (desc == null) { + if (desc == null || F.isEmpty(cacheNodes)) { if (ctx.clientDisconnected()) - throw new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Failed to get affinity mapping, client disconnected."); + return new GridFinishedFuture<>(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to get affinity mapping, client disconnected.")); - return null; + return new GridFinishedFuture<>(null); } if (desc.cacheConfiguration().getCacheMode() == LOCAL) - throw new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName); + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to map keys for LOCAL cache: " + cacheName)); - GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>(); + AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, cacheNodes); IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0); if (old != null) - return old.get(); + return old; - List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName); + fut0.getAffinityFromNextNode(); - for (int i = 0; i < cacheNodes.size(); i++) { - ClusterNode node = cacheNodes.get(i); + return fut0; + } - if (!discoCache.alive(node.id())) - continue; + /** + * + */ + private class AffinityFuture extends GridFutureAdapter<AffinityInfo> { + /** */ + private final String cacheName; - try { - fut0.onDone(affinityInfoFromNode(cacheName, topVer, node)); + /** */ + private final AffinityTopologyVersion topVer; - break; - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) { - if (log.isDebugEnabled()) - log.debug("Failed to get affinity from node, node failed [cache=" + cacheName + - ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); + /** */ + private final List<ClusterNode> cacheNodes; + /** */ + private int nodeIdx; + + /** + * @param cacheName Cache name. + * @param topVer Topology version. + * @param cacheNodes Cache nodes. + */ + AffinityFuture(String cacheName, AffinityTopologyVersion topVer, List<ClusterNode> cacheNodes) { + this.cacheName = cacheName; + this.topVer = topVer; + this.cacheNodes = cacheNodes; + } + + /** + * + */ + void getAffinityFromNextNode() { + while (nodeIdx < cacheNodes.size()) { + final ClusterNode node = cacheNodes.get(nodeIdx); + + nodeIdx++; + + if (!ctx.discovery().alive(node.id())) continue; - } - if (log.isDebugEnabled()) - log.debug("Failed to get affinity from node [cache=" + cacheName + - ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); + affinityInfoFromNode(cacheName, topVer, node).listen(new CI1<IgniteInternalFuture<AffinityInfo>>() { + @Override public void apply(IgniteInternalFuture<AffinityInfo> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException || X.hasCause(e, ClusterTopologyException.class)) { + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node, node failed [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); + getAffinityFromNextNode(); - break; - } - catch (RuntimeException | Error e) { - fut0.onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); + return; + } - break; - } - } + if (log.isDebugEnabled()) + log.debug("Failed to get affinity from node [cache=" + cacheName + + ", node=" + node.id() + ", msg=" + e.getMessage() + ']'); - if (!fut0.isDone()) - fut0.onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName)); + onDone(new IgniteCheckedException("Failed to get affinity mapping from node: " + node.id(), e)); + } + } + }); - return fut0.get(); + return; + } + + onDone(new ClusterGroupEmptyCheckedException("Failed to get cache affinity, all cache nodes failed: " + cacheName)); + } } /** @@ -517,26 +564,30 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @param cacheName Name of cache on which affinity is requested. * @param topVer Topology version. * @param n Node from which affinity is requested. - * @return Affinity cached function. - * @throws IgniteCheckedException If either local or remote node cannot get deployment for affinity objects. + * @return Affinity future. */ - private AffinityInfo affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) - throws IgniteCheckedException { - GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() - .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false).get(); + private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String cacheName, AffinityTopologyVersion topVer, ClusterNode n) { + IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure() + .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0, false); + + return fut.chain(new CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>>, AffinityInfo>() { + @Override public AffinityInfo applyx(IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment>> fut) throws IgniteCheckedException { + GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = fut.get(); - AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1()); - AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); + AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1()); + AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); - assert m != null; + assert m != null; - // Bring to initial state. - f.reset(); - m.reset(); + // Bring to initial state. + f.reset(); + m.reset(); - CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName); + CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName); - return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg)); + return new AffinityInfo(f, m, t.get3(), ctx.cacheObjects().contextForCache(ccfg)); + } + }); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 3eef2b6..2568b36 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -927,9 +927,21 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { mapTopVer = ctx.cache().context().exchange().readyAffinityVersion(); affFut = ctx.cache().context().exchange().lastTopologyFuture(); + + if (affFut == null || affFut.isDone()) { + affFut = null; + + // Need asynchronosly fetch affinity if cache is not started on node . + if (affCacheName != null && ctx.cache().internalCache(affCacheName) == null) { + affFut = ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer); + + if (affFut.isDone()) + affFut = null; + } + } } - if (affFut != null && !affFut.isDone()) { + if (affFut != null) { waitForAffTop = true; jobRes.resetResponse(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java index dab045a..c779489 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.Ignite; @@ -167,8 +168,14 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { assertTrue(client.configuration().isClientMode()); assertNull(client.context().cache().cache(CACHE_NAME)); + final int THREADS = 5; + + CyclicBarrier b = new CyclicBarrier(THREADS + 1); + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() { + @Override public Object call() throws Exception { + b.await(); + for (int i = 0; i < SRVS; ++i) stopGrid(i, false); @@ -177,8 +184,16 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { }); try { - while (!fut.isDone()) - client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null)); + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Void call() throws Exception { + b.await(); + + while (!fut.isDone()) + client.compute().affinityCall(CACHE_NAME, key, new CheckCallable(key, null)); + + return null; + } + }, THREADS, "test-thread"); } catch (ClusterTopologyException e) { log.info("Expected error: " + e); @@ -214,12 +229,16 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest { }); try { - Affinity<Integer> aff = client.affinity(CACHE_NAME); + final Affinity<Integer> aff = client.affinity(CACHE_NAME); assertNull(client.context().cache().cache(CACHE_NAME)); - while (!fut.isDone()) - assertNotNull(aff.mapKeyToNode(key)); + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + while (!fut.isDone()) + assertNotNull(aff.mapKeyToNode(key)); + } + }, 5, "test-thread"); } finally { stopAllGrids();
