Fixed unexpected object deserialization when local store is enabled.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd6a67f2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd6a67f2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd6a67f2 Branch: refs/heads/master Commit: bd6a67f21ef133a7a63d1f0fc3a8538ce33783f3 Parents: bc143c6 Author: dkarachentsev <[email protected]> Authored: Thu May 5 17:06:22 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed May 11 09:57:47 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 195 +++++++++++++++++-- .../distributed/dht/GridDhtCacheAdapter.java | 4 +- .../distributed/near/GridNearCacheAdapter.java | 4 +- 3 files changed, 178 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8c1a750..d81cbd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -168,6 +168,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = IgniteProductVersion.fromString("1.5.7"); + /** */ + public static final IgniteProductVersion LOAD_CACHE_JOB_V2_SINCE = IgniteProductVersion.fromString("1.5.19"); + /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -3405,6 +3408,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + if (p != null) ctx.kernalContext().resource().injectGeneric(p); @@ -3417,6 +3422,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); + ldr.keepBinary(keepBinary); + LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); ctx.store().loadCache(c, args); @@ -3527,18 +3534,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; - if (replaceExisting) { - if (ctx.store().isLocal()) { - Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - if (nodes.isEmpty()) - return new GridFinishedFuture<>(); - - return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, true, plc), - nodes, - true); - } + if (replaceExisting) { + if (ctx.store().isLocal()) + return runLoadKeysCallable(keys, plc, keepBinary, true); else { return ctx.closures().callLocalSafe(new Callable<Void>() { @Override public Void call() throws Exception { @@ -3549,14 +3549,41 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V }); } } - else { - Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + else + return runLoadKeysCallable(keys, plc, keepBinary, false); + } - if (nodes.isEmpty()) - return new GridFinishedFuture<>(); + /** + * Run load keys callable on appropriate nodes. + * + * @param keys Keys. + * @param plc Expiry policy. + * @param keepBinary Keep binary flag. Will be ignored for releases older than {@link #LOAD_CACHE_JOB_V2_SINCE}. + * @return Operation future. + */ + private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K> keys, final ExpiryPolicy plc, + final boolean keepBinary, final boolean update) { + Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + + if (nodes.isEmpty()) + return new GridFinishedFuture<>(); + + Collection<ClusterNode> oldNodes = ctx.grid().cluster().forDataNodes(name()).forPredicate( + new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0; + } + }).nodes(); + if (oldNodes.isEmpty()) { return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, false, plc), + new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, keepBinary), + nodes, + true); + } + else { + return ctx.closures().callAsyncNoFailover(BROADCAST, + new LoadKeysCallable<>(ctx.name(), keys, update, plc), nodes, true); } @@ -3598,8 +3625,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param plc Optional expiry policy. * @throws IgniteCheckedException If failed. */ - public void localLoad(Collection<? extends K> keys, - @Nullable ExpiryPolicy plc) + public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc, final boolean keepBinary) throws IgniteCheckedException { final boolean replicate = ctx.isDrEnabled(); final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); @@ -3614,6 +3640,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { ldr.skipStore(true); + ldr.keepBinary(keepBinary); + ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); @@ -3670,7 +3698,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ClusterGroup newNodes = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) .forPredicate(new IgnitePredicate<ClusterNode>() { @Override public boolean apply(ClusterNode node) { - return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0; + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0 && + node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0; + } + }); + + ClusterGroup newNodesV2 = ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name()) + .forPredicate(new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) >= 0; } }); @@ -3699,6 +3735,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V fut.add(newNodesFut); } + if (!F.isEmpty(newNodesV2.nodes())) { + final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); + + ComputeTaskInternalFuture newNodesV2Fut = ctx.kernalContext().closure().callAsync(BROADCAST, + Collections.singletonList( + new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)), + newNodesV2.nodes()); + + fut.add(newNodesV2Fut); + } + fut.markInitialized(); return fut; @@ -5541,6 +5588,49 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Load cache job that with keepBinary flag. + */ + private static class LoadCacheJobV2<K, V> extends LoadCacheJob<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final boolean keepBinary; + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param p Predicate. + * @param loadArgs Arguments. + * @param keepBinary Keep binary flag. + */ + public LoadCacheJobV2(final String cacheName, final AffinityTopologyVersion topVer, + final IgniteBiPredicate<K, V> p, final Object[] loadArgs, final ExpiryPolicy plc, + final boolean keepBinary) { + super(cacheName, topVer, p, loadArgs, plc); + + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { + assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; + + if (keepBinary) + cache = cache.keepBinary(); + + return super.localExecute(cache); + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(LoadCacheJobV2.class, this); + } + } + + /** * Holder for last async operation future. */ protected static class FutureHolder { @@ -5752,7 +5842,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param cacheName Cache name. * @param keys Keys. * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} - * otherwise {@link #localLoad(Collection, ExpiryPolicy)}. + * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}. * @param plc Expiry policy. */ LoadKeysCallable(String cacheName, @@ -5767,6 +5857,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Void call() throws Exception { + return call0(false); + } + + /** + * Internal call routine. + * + * @param keepBinary Keep binary flag. + * @return Result (always {@code null}). + * @throws Exception If failed. + */ + protected Void call0(boolean keepBinary) throws Exception { GridCacheAdapter<K, V> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); assert cache != null : cacheName; @@ -5777,7 +5878,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (update) cache.localLoadAndUpdate(keys); else - cache.localLoad(keys, plc); + cache.localLoad(keys, plc, keepBinary); } finally { cache.context().gate().leave(); @@ -5812,6 +5913,58 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * */ + static class LoadKeysCallableV2<K, V> extends LoadKeysCallable<K, V> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean keepBinary; + + /** + * Required by {@link Externalizable}. + */ + public LoadKeysCallableV2() { + // No-op. + } + + /** + * @param cacheName Cache name. + * @param keys Keys. + * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} + * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}. + * @param plc Expiry policy. + * @param keepBinary Keep binary flag. + */ + LoadKeysCallableV2(final String cacheName, final Collection<? extends K> keys, final boolean update, + final ExpiryPolicy plc, final boolean keepBinary) { + super(cacheName, keys, update, plc); + + this.keepBinary = keepBinary; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + return call0(keepBinary); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(final ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeBoolean(keepBinary); + } + + /** {@inheritDoc} */ + @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + keepBinary = in.readBoolean(); + } + } + + /** + * + */ private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCacheVersion> { /** */ final IgniteBiPredicate<K, V> p; http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 5ff674f..9f15bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -442,10 +442,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) + @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc, final boolean keepBinary) throws IgniteCheckedException { if (ctx.store().isLocal()) { - super.localLoad(keys, plc); + super.localLoad(keys, plc, keepBinary); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 7971173..dd66a33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -273,8 +273,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc) throws IgniteCheckedException { - dht().localLoad(keys, plc); + @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy plc, boolean keepBinary) throws IgniteCheckedException { + dht().localLoad(keys, plc, keepBinary); } /** {@inheritDoc} */
