Repository: ignite
Updated Branches:
  refs/heads/ignite-1741 96860ed13 -> d87690d6d


ignite-1741


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d87690d6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d87690d6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d87690d6

Branch: refs/heads/ignite-1741
Commit: d87690d6d7f4a3bdbc4818e70d8cc3f5116bca80
Parents: 96860ed
Author: sboikov <[email protected]>
Authored: Wed Dec 19 13:26:46 2018 +0300
Committer: sboikov <[email protected]>
Committed: Wed Dec 19 13:26:46 2018 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityProcessor.java         | 169 ++++++++++++-------
 .../processors/task/GridTaskWorker.java         |  14 +-
 .../cache/CacheAffinityCallSelfTest.java        |  31 +++-
 3 files changed, 148 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index cddd3c1..a290d83 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -58,6 +58,8 @@ import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CX1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -381,17 +383,25 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
         return affInfo != null ? affinityMap(affInfo, keys) : 
Collections.<ClusterNode, Collection<K>>emptyMap();
     }
-
     /**
      * @param cacheName Cache name.
      * @param topVer Topology version.
      * @return Affinity cache.
      * @throws IgniteCheckedException In case of error.
      */
-    @SuppressWarnings("ErrorNotRethrown")
     @Nullable private AffinityInfo affinityCache(final String cacheName, 
AffinityTopologyVersion topVer)
         throws IgniteCheckedException {
+        return affinityCacheFuture(cacheName, topVer).get();
+    }
 
+    /**
+     * @param cacheName Cache name.
+     * @param topVer Topology version.
+     * @return Affinity cache.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public IgniteInternalFuture<AffinityInfo> affinityCacheFuture(final String 
cacheName, AffinityTopologyVersion topVer)
+        throws IgniteCheckedException {
         assert cacheName != null;
 
         AffinityAssignmentKey key = new AffinityAssignmentKey(cacheName, 
topVer);
@@ -399,7 +409,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         IgniteInternalFuture<AffinityInfo> fut = affMap.get(key);
 
         if (fut != null)
-            return fut.get();
+            return fut;
 
         GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
 
@@ -414,7 +424,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
                 cctx.gate().enter();
             }
             catch (IllegalStateException ignored) {
-                return null;
+                return new GridFinishedFuture<>(null);
             }
 
             try {
@@ -428,12 +438,14 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
                     assign,
                     cctx.cacheObjectContext());
 
-                IgniteInternalFuture<AffinityInfo> old = 
affMap.putIfAbsent(key, new GridFinishedFuture<>(info));
+                GridFinishedFuture<AffinityInfo> fut0 = new 
GridFinishedFuture<>(info);
+
+                IgniteInternalFuture<AffinityInfo> old = 
affMap.putIfAbsent(key, fut0);
 
                 if (old != null)
-                    info = old.get();
+                    return old;
 
-                return info;
+                return fut0;
             }
             finally {
                 cctx.gate().leave();
@@ -443,72 +455,107 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         DiscoCache discoCache = ctx.discovery().discoCache(topVer);
 
         if (discoCache == null)
-            throw new IgniteCheckedException("Failed to resolve cluster 
topology: " + topVer);
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed 
to resolve cluster topology: " + topVer));
 
         if (!discoCache.cacheStarted(cacheName))
-            return null;
+            return new GridFinishedFuture<>(null);
+
+        List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName);
 
         DynamicCacheDescriptor desc = ctx.cache().cacheDescriptor(cacheName);
 
-        if (desc == null) {
+        if (desc == null || F.isEmpty(cacheNodes)) {
             if (ctx.clientDisconnected())
-                throw new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
-                        "Failed to get affinity mapping, client 
disconnected.");
+                return new GridFinishedFuture<>(new 
IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                        "Failed to get affinity mapping, client 
disconnected."));
 
-            return null;
+            return new GridFinishedFuture<>(null);
         }
 
         if (desc.cacheConfiguration().getCacheMode() == LOCAL)
-            throw new IgniteCheckedException("Failed to map keys for LOCAL 
cache: " + cacheName);
+            return new GridFinishedFuture<>(new IgniteCheckedException("Failed 
to map keys for LOCAL cache: " + cacheName));
 
-        GridFutureAdapter<AffinityInfo> fut0 = new GridFutureAdapter<>();
+        AffinityFuture fut0 = new AffinityFuture(cacheName, topVer, 
cacheNodes);
 
         IgniteInternalFuture<AffinityInfo> old = affMap.putIfAbsent(key, fut0);
 
         if (old != null)
-            return old.get();
+            return old;
 
-        List<ClusterNode> cacheNodes = discoCache.cacheNodes(cacheName);
+        fut0.getAffinityFromNextNode();
 
-        for (int i = 0; i < cacheNodes.size(); i++) {
-            ClusterNode node = cacheNodes.get(i);
+        return fut0;
+    }
 
-            if (!discoCache.alive(node.id()))
-                continue;
+    /**
+     *
+     */
+    private class AffinityFuture extends GridFutureAdapter<AffinityInfo> {
+        /** */
+        private final String cacheName;
 
-            try {
-                fut0.onDone(affinityInfoFromNode(cacheName, topVer, node));
+        /** */
+        private final AffinityTopologyVersion topVer;
 
-                break;
-            }
-            catch (IgniteCheckedException e) {
-                if (e instanceof ClusterTopologyCheckedException || 
X.hasCause(e, ClusterTopologyException.class)) {
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to get affinity from node, node 
failed [cache=" + cacheName +
-                                ", node=" + node.id() + ", msg=" + 
e.getMessage() + ']');
+        /** */
+        private final List<ClusterNode> cacheNodes;
 
+        /** */
+        private int nodeIdx;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Topology version.
+         * @param cacheNodes Cache nodes.
+         */
+        AffinityFuture(String cacheName, AffinityTopologyVersion topVer, 
List<ClusterNode> cacheNodes) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.cacheNodes = cacheNodes;
+        }
+
+        /**
+         *
+         */
+        void getAffinityFromNextNode() {
+            while (nodeIdx < cacheNodes.size()) {
+                final ClusterNode node = cacheNodes.get(nodeIdx);
+
+                nodeIdx++;
+
+                if (!ctx.discovery().alive(node.id()))
                     continue;
-                }
 
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get affinity from node [cache=" + 
cacheName +
-                        ", node=" + node.id() + ", msg=" + e.getMessage() + 
']');
+                affinityInfoFromNode(cacheName, topVer, node).listen(new 
CI1<IgniteInternalFuture<AffinityInfo>>() {
+                    @Override public void 
apply(IgniteInternalFuture<AffinityInfo> fut) {
+                        try {
+                            onDone(fut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (e instanceof ClusterTopologyCheckedException 
|| X.hasCause(e, ClusterTopologyException.class)) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to get affinity from 
node, node failed [cache=" + cacheName +
+                                            ", node=" + node.id() + ", msg=" + 
e.getMessage() + ']');
 
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + node.id(), e));
+                                getAffinityFromNextNode();
 
-                break;
-            }
-            catch (RuntimeException | Error e) {
-                fut0.onDone(new IgniteCheckedException("Failed to get affinity 
mapping from node: " + node.id(), e));
+                                return;
+                            }
 
-                break;
-            }
-        }
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to get affinity from node 
[cache=" + cacheName +
+                                        ", node=" + node.id() + ", msg=" + 
e.getMessage() + ']');
 
-        if (!fut0.isDone())
-            fut0.onDone(new ClusterGroupEmptyCheckedException("Failed to get 
cache affinity, all cache nodes failed: " + cacheName));
+                            onDone(new IgniteCheckedException("Failed to get 
affinity mapping from node: " + node.id(), e));
+                        }
+                    }
+                });
 
-        return fut0.get();
+                return;
+            }
+
+            onDone(new ClusterGroupEmptyCheckedException("Failed to get cache 
affinity, all cache nodes failed: " + cacheName));
+        }
     }
 
     /**
@@ -517,26 +564,30 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
      * @param cacheName Name of cache on which affinity is requested.
      * @param topVer Topology version.
      * @param n Node from which affinity is requested.
-     * @return Affinity cached function.
-     * @throws IgniteCheckedException If either local or remote node cannot 
get deployment for affinity objects.
+     * @return Affinity future.
      */
-    private AffinityInfo affinityInfoFromNode(String cacheName, 
AffinityTopologyVersion topVer, ClusterNode n)
-        throws IgniteCheckedException {
-        GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment> t = ctx.closure()
-            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/, 0, false).get();
+    private IgniteInternalFuture<AffinityInfo> affinityInfoFromNode(String 
cacheName, AffinityTopologyVersion topVer, ClusterNode n) {
+        IgniteInternalFuture<GridTuple3<GridAffinityMessage, 
GridAffinityMessage, GridAffinityAssignment>> fut = ctx.closure()
+            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/, 0, false);
+
+        return fut.chain(new 
CX1<IgniteInternalFuture<GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment>>, AffinityInfo>() {
+            @Override public AffinityInfo 
applyx(IgniteInternalFuture<GridTuple3<GridAffinityMessage, 
GridAffinityMessage, GridAffinityAssignment>> fut) throws 
IgniteCheckedException {
+                GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment> t = fut.get();
 
-        AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), 
t.get1());
-        AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), 
t.get2());
+                AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), 
t.get1());
+                AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, 
n.id(), t.get2());
 
-        assert m != null;
+                assert m != null;
 
-        // Bring to initial state.
-        f.reset();
-        m.reset();
+                // Bring to initial state.
+                f.reset();
+                m.reset();
 
-        CacheConfiguration ccfg = ctx.cache().cacheConfiguration(cacheName);
+                CacheConfiguration ccfg = 
ctx.cache().cacheConfiguration(cacheName);
 
-        return new AffinityInfo(f, m, t.get3(), 
ctx.cacheObjects().contextForCache(ccfg));
+                return new AffinityInfo(f, m, t.get3(), 
ctx.cacheObjects().contextForCache(ccfg));
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 3eef2b6..2568b36 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -927,9 +927,21 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
                                     mapTopVer = 
ctx.cache().context().exchange().readyAffinityVersion();
 
                                     affFut = 
ctx.cache().context().exchange().lastTopologyFuture();
+
+                                    if (affFut == null || affFut.isDone()) {
+                                        affFut = null;
+
+                                        // Need asynchronosly fetch affinity 
if cache is not started on node .
+                                        if (affCacheName != null && 
ctx.cache().internalCache(affCacheName) == null) {
+                                            affFut = 
ctx.affinity().affinityCacheFuture(affCacheName, mapTopVer);
+
+                                            if (affFut.isDone())
+                                                affFut = null;
+                                        }
+                                    }
                                 }
 
-                                if (affFut != null && !affFut.isDone()) {
+                                if (affFut != null) {
                                     waitForAffTop = true;
 
                                     jobRes.resetResponse();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d87690d6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index dab045a..c779489 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.ignite.Ignite;
@@ -167,8 +168,14 @@ public class CacheAffinityCallSelfTest extends 
GridCommonAbstractTest {
         assertTrue(client.configuration().isClientMode());
         assertNull(client.context().cache().cache(CACHE_NAME));
 
+        final int THREADS = 5;
+
+        CyclicBarrier b = new CyclicBarrier(THREADS + 1);
+
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
-            @Override public Object call() {
+            @Override public Object call() throws Exception {
+                b.await();
+
                 for (int i = 0; i < SRVS; ++i)
                     stopGrid(i, false);
 
@@ -177,8 +184,16 @@ public class CacheAffinityCallSelfTest extends 
GridCommonAbstractTest {
         });
 
         try {
-            while (!fut.isDone())
-                client.compute().affinityCall(CACHE_NAME, key, new 
CheckCallable(key, null));
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Void call() throws Exception {
+                    b.await();
+
+                    while (!fut.isDone())
+                        client.compute().affinityCall(CACHE_NAME, key, new 
CheckCallable(key, null));
+
+                    return null;
+                }
+            }, THREADS, "test-thread");
         }
         catch (ClusterTopologyException e) {
             log.info("Expected error: " + e);
@@ -214,12 +229,16 @@ public class CacheAffinityCallSelfTest extends 
GridCommonAbstractTest {
         });
 
         try {
-            Affinity<Integer> aff = client.affinity(CACHE_NAME);
+            final Affinity<Integer> aff = client.affinity(CACHE_NAME);
 
             assertNull(client.context().cache().cache(CACHE_NAME));
 
-            while (!fut.isDone())
-                assertNotNull(aff.mapKeyToNode(key));
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                @Override public void run() {
+                    while (!fut.isDone())
+                        assertNotNull(aff.mapKeyToNode(key));
+                }
+            }, 5, "test-thread");
         }
         finally {
             stopAllGrids();

Reply via email to