IGNITE-4717 Fixed hangs in VisorCacheClearTask. (cherry picked from commit 76f3060)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/95dd7482 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/95dd7482 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/95dd7482 Branch: refs/heads/master Commit: 95dd74829339c05b3f092829a3f84fadd84f67f2 Parents: 0130b09 Author: Andrey Novikov <[email protected]> Authored: Mon Feb 20 18:23:33 2017 +0700 Committer: Andrey Novikov <[email protected]> Committed: Mon Feb 20 18:29:58 2017 +0700 ---------------------------------------------------------------------- .../visor/cache/VisorCacheClearTask.java | 88 +++++--------------- .../visor/compute/VisorGatewayTask.java | 30 ++++++- 2 files changed, 49 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/95dd7482/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java index 1f1a6fb..0c8476f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheClearTask.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.visor.cache; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.internal.processors.task.GridInternal; @@ -26,7 +25,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorOneNodeTask; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.resources.JobContextResource; @@ -90,17 +88,11 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple< } /** - * @param subJob Sub job to execute asynchronously. + * @param fut Future for asynchronous cache operation. * @param idx Index. * @return {@code true} If subJob was not completed and this job should be suspended. */ - private boolean callAsync(IgniteCallable<Integer> subJob, int idx) { - IgniteCompute compute = ignite.compute(ignite.cluster().forCacheNodes(cacheName)).withAsync(); - - compute.call(subJob); - - IgniteFuture<Integer> fut = compute.future(); - + private boolean callAsync(IgniteFuture<Integer> fut, int idx) { futs[idx] = fut; if (fut.isDone()) @@ -119,16 +111,28 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple< futs = new IgniteFuture[3]; if (futs[0] == null || futs[1] == null || futs[2] == null) { - IgniteCache cache = ignite.cache(cacheName); + IgniteCache cache = ignite.cache(cacheName).withAsync(); + + if (futs[0] == null) { + cache.size(CachePeekMode.PRIMARY); + + if (callAsync(cache.<Integer>future(), 0)) + return null; + } - if (futs[0] == null && callAsync(new VisorCacheSizeCallable(cache), 0)) - return null; + if (futs[1] == null) { + cache.clear(); - if (futs[1] == null && callAsync(new VisorCacheClearCallable(cache), 1)) - return null; + if (callAsync(cache.<Integer>future(), 1)) + return null; + } + + if (futs[2] == null) { + cache.size(CachePeekMode.PRIMARY); - if (futs[2] == null && callAsync(new VisorCacheSizeCallable(cache), 2)) - return null; + if (callAsync(cache.<Integer>future(), 2)) + return null; + } } assert futs[0].isDone() && futs[1].isDone() && futs[2].isDone(); @@ -141,54 +145,4 @@ public class VisorCacheClearTask extends VisorOneNodeTask<String, IgniteBiTuple< return S.toString(VisorCacheClearJob.class, this); } } - - /** - * Callable to get cache size. - */ - @GridInternal - private static class VisorCacheSizeCallable implements IgniteCallable<Integer> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteCache cache; - - /** - * @param cache Cache to take size from. - */ - private VisorCacheSizeCallable(IgniteCache cache) { - this.cache = cache; - } - - /** {@inheritDoc} */ - @Override public Integer call() throws Exception { - return cache.size(CachePeekMode.PRIMARY); - } - } - - /** - * Callable to clear cache. - */ - @GridInternal - private static class VisorCacheClearCallable implements IgniteCallable<Integer> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteCache cache; - - /** - * @param cache Cache to clear. - */ - private VisorCacheClearCallable(IgniteCache cache) { - this.cache = cache; - } - - /** {@inheritDoc} */ - @Override public Integer call() throws Exception { - cache.clear(); - - return 0; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/95dd7482/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java index 2539a26..a64ec6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/compute/VisorGatewayTask.java @@ -29,21 +29,26 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.ComputeJobAdapter; +import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.compute.ComputeJobResult; import org.apache.ignite.compute.ComputeJobResultPolicy; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.visor.VisorTaskArgument; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.JobContextResource; import org.jetbrains.annotations.Nullable; /** @@ -101,9 +106,16 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> { @IgniteInstanceResource protected transient IgniteEx ignite; + /** Auto-inject job context. */ + @JobContextResource + protected transient ComputeJobContext jobCtx; + /** Arguments count. */ private final int argsCnt; + /** Future for spawned task. */ + private transient IgniteFuture fut; + /** * Create job with specified argument. * @@ -284,6 +296,9 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public Object execute() throws IgniteException { + if (fut != null) + return fut.get(); + String nidsArg = argument(0); String taskName = argument(1); @@ -355,8 +370,19 @@ public class VisorGatewayTask implements ComputeTask<Object[], Object> { } } - return ignite.compute(ignite.cluster().forNodeIds(nids)) - .execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false)); + IgniteCompute comp = ignite.compute(ignite.cluster().forNodeIds(nids)).withAsync(); + + comp.execute(taskName, new VisorTaskArgument<>(nids, jobArgs, false)); + + fut = comp.future(); + + fut.listen(new CI1<IgniteFuture<Object>>() { + @Override public void apply(IgniteFuture<Object> f) { + jobCtx.callcc(); + } + }); + + return jobCtx.holdcc(); } } }
