IGNITE-2465 - Fixed race in load cache closure - Fixes #431. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/500bd3ab Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/500bd3ab Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/500bd3ab Branch: refs/heads/ignite-2541 Commit: 500bd3ab576830f8160eb66274590b7684a39599 Parents: e7de923 Author: ashutak <[email protected]> Authored: Wed Feb 3 14:56:42 2016 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Feb 3 14:56:42 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 100 ++++++++++++++++++- 1 file changed, 96 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/500bd3ab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 69abc54..2c3a197 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -70,6 +70,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; +import org.apache.ignite.internal.ComputeTaskInternalFuture; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; @@ -101,6 +102,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -128,6 +130,7 @@ import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.mxbean.CacheMetricsMXBean; @@ -166,6 +169,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Maximum number of retries when topology changes. */ public static final int MAX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100); + /** */ + public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7"); + /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -3737,7 +3743,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { - ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()); + ClusterGroup oldNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) < 0; + } + }); + + ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0; + } + }); ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); @@ -3745,9 +3763,27 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; - return ctx.kernalContext().closure().callAsync(BROADCAST, - Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), - nodes.nodes()); + GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>(); + + if (!F.isEmpty(oldNodes.nodes())) { + ComputeTaskInternalFuture oldNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheClosure<>(ctx.name(), p, args, plc)), + oldNodes.nodes()); + + fut.add(oldNodesFut); + } + + if (!F.isEmpty(newNodes.nodes())) { + ComputeTaskInternalFuture newNodesFut = ctx.kernalContext().closure().callAsync(BROADCAST, + Arrays.asList(new LoadCacheJob<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc)), + newNodes.nodes()); + + fut.add(newNodesFut); + } + + fut.markInitialized(); + + return fut; } /** @@ -5498,6 +5534,62 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Internal callable for global size calculation. + */ + @GridInternal + private static class LoadCacheJob<K, V> extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteBiPredicate<K, V> p; + + /** */ + private final Object[] args; + + /** */ + private final ExpiryPolicy plc; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param p Predicate. + * @param args Arguments. + * @param plc Policy. + */ + private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, Object[] args, + ExpiryPolicy plc) { + super(cacheName, topVer); + + this.p = p; + this.args = args; + this.plc = plc; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + try { + assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; + + if (plc != null) + cache = cache.withExpiryPolicy(plc); + + cache.localLoadCache(p, args); + + return null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(LoadCacheJob.class, this); + } + } + + /** * Holder for last async operation future. */ protected static class FutureHolder {
