Fixed unexpected object deserialization when local store is enabled.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd6a67f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd6a67f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd6a67f2

Branch: refs/heads/master
Commit: bd6a67f21ef133a7a63d1f0fc3a8538ce33783f3
Parents: bc143c6
Author: dkarachentsev <[email protected]>
Authored: Thu May 5 17:06:22 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Wed May 11 09:57:47 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 195 +++++++++++++++++--
 .../distributed/dht/GridDhtCacheAdapter.java    |   4 +-
 .../distributed/near/GridNearCacheAdapter.java  |   4 +-
 3 files changed, 178 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8c1a750..d81cbd2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -168,6 +168,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /** */
     public static final IgniteProductVersion LOAD_CACHE_JOB_SINCE = 
IgniteProductVersion.fromString("1.5.7");
 
+    /** */
+    public static final IgniteProductVersion LOAD_CACHE_JOB_V2_SINCE = 
IgniteProductVersion.fromString("1.5.19");
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = 
new ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -3405,6 +3408,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
+        final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
         if (p != null)
             ctx.kernalContext().resource().injectGeneric(p);
 
@@ -3417,6 +3422,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
                     ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
+                    ldr.keepBinary(keepBinary);
+
                     LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, 
ldr, plc);
 
                     ctx.store().loadCache(c, args);
@@ -3527,18 +3534,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null;
 
-        if (replaceExisting) {
-            if (ctx.store().isLocal()) {
-                Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
+        final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
-                if (nodes.isEmpty())
-                    return new GridFinishedFuture<>();
-
-                return ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new LoadKeysCallable<>(ctx.name(), keys, true, plc),
-                    nodes,
-                    true);
-            }
+        if (replaceExisting) {
+            if (ctx.store().isLocal())
+                return runLoadKeysCallable(keys, plc, keepBinary, true);
             else {
                 return ctx.closures().callLocalSafe(new Callable<Void>() {
                     @Override public Void call() throws Exception {
@@ -3549,14 +3549,41 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 });
             }
         }
-        else {
-            Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
+        else
+            return runLoadKeysCallable(keys, plc, keepBinary, false);
+    }
 
-            if (nodes.isEmpty())
-                return new GridFinishedFuture<>();
+    /**
+     * Run load keys callable on appropriate nodes.
+     *
+     * @param keys Keys.
+     * @param plc Expiry policy.
+     * @param keepBinary Keep binary flag. Will be ignored for releases older 
than {@link #LOAD_CACHE_JOB_V2_SINCE}.
+     * @return Operation future.
+     */
+    private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K> 
keys, final ExpiryPolicy plc,
+        final boolean keepBinary, final boolean update) {
+        Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
+
+        if (nodes.isEmpty())
+            return new GridFinishedFuture<>();
+
+        Collection<ClusterNode> oldNodes = 
ctx.grid().cluster().forDataNodes(name()).forPredicate(
+            new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return 
node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0;
+                }
+            }).nodes();
 
+        if (oldNodes.isEmpty()) {
             return ctx.closures().callAsyncNoFailover(BROADCAST,
-                new LoadKeysCallable<>(ctx.name(), keys, false, plc),
+                new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, 
keepBinary),
+                nodes,
+                true);
+        }
+        else {
+            return ctx.closures().callAsyncNoFailover(BROADCAST,
+                new LoadKeysCallable<>(ctx.name(), keys, update, plc),
                 nodes,
                 true);
         }
@@ -3598,8 +3625,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @param plc Optional expiry policy.
      * @throws IgniteCheckedException If failed.
      */
-    public void localLoad(Collection<? extends K> keys,
-        @Nullable ExpiryPolicy plc)
+    public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy 
plc, final boolean keepBinary)
         throws IgniteCheckedException {
         final boolean replicate = ctx.isDrEnabled();
         final AffinityTopologyVersion topVer = 
ctx.affinity().affinityTopologyVersion();
@@ -3614,6 +3640,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             try {
                 ldr.skipStore(true);
 
+                ldr.keepBinary(keepBinary);
+
                 ldr.receiver(new IgniteDrDataStreamerCacheUpdater());
 
                 LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 
plc0);
@@ -3670,7 +3698,15 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         ClusterGroup newNodes = 
ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
             .forPredicate(new IgnitePredicate<ClusterNode>() {
                 @Override public boolean apply(ClusterNode node) {
-                    return 
node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0;
+                    return 
node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_SINCE) >= 0 &&
+                        
node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) < 0;
+                }
+            });
+
+        ClusterGroup newNodesV2 = 
ctx.kernalContext().grid().cluster().forCacheNodes(ctx.name())
+            .forPredicate(new IgnitePredicate<ClusterNode>() {
+                @Override public boolean apply(ClusterNode node) {
+                    return 
node.version().compareToIgnoreTimestamp(LOAD_CACHE_JOB_V2_SINCE) >= 0;
                 }
             });
 
@@ -3699,6 +3735,17 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             fut.add(newNodesFut);
         }
 
+        if (!F.isEmpty(newNodesV2.nodes())) {
+            final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
+
+            ComputeTaskInternalFuture newNodesV2Fut = 
ctx.kernalContext().closure().callAsync(BROADCAST,
+                Collections.singletonList(
+                    new LoadCacheJobV2<>(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)),
+                newNodesV2.nodes());
+
+            fut.add(newNodesV2Fut);
+        }
+
         fut.markInitialized();
 
         return fut;
@@ -5541,6 +5588,49 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
+     * Load cache job that with keepBinary flag.
+     */
+    private static class LoadCacheJobV2<K, V> extends LoadCacheJob<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final boolean keepBinary;
+
+        /**
+         * Constructor.
+         *
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param p Predicate.
+         * @param loadArgs Arguments.
+         * @param keepBinary Keep binary flag.
+         */
+        public LoadCacheJobV2(final String cacheName, final 
AffinityTopologyVersion topVer,
+            final IgniteBiPredicate<K, V> p, final Object[] loadArgs, final 
ExpiryPolicy plc,
+            final boolean keepBinary) {
+            super(cacheName, topVer, p, loadArgs, plc);
+
+            this.keepBinary = keepBinary;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+            assert cache != null : "Failed to get a cache [cacheName=" + 
cacheName + ", topVer=" + topVer + "]";
+
+            if (keepBinary)
+                cache = cache.keepBinary();
+
+            return super.localExecute(cache);
+        }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return S.toString(LoadCacheJobV2.class, this);
+        }
+    }
+
+    /**
      * Holder for last async operation future.
      */
     protected static class FutureHolder {
@@ -5752,7 +5842,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param cacheName Cache name.
          * @param keys Keys.
          * @param update If {@code true} calls {@link 
#localLoadAndUpdate(Collection)}
-         *        otherwise {@link #localLoad(Collection, ExpiryPolicy)}.
+         *        otherwise {@link #localLoad(Collection, ExpiryPolicy, 
boolean)}.
          * @param plc Expiry policy.
          */
         LoadKeysCallable(String cacheName,
@@ -5767,6 +5857,17 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /** {@inheritDoc} */
         @Override public Void call() throws Exception {
+            return call0(false);
+        }
+
+        /**
+         * Internal call routine.
+         *
+         * @param keepBinary Keep binary flag.
+         * @return Result (always {@code null}).
+         * @throws Exception If failed.
+         */
+        protected Void call0(boolean keepBinary) throws Exception {
             GridCacheAdapter<K, V> cache = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             assert cache != null : cacheName;
@@ -5777,7 +5878,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 if (update)
                     cache.localLoadAndUpdate(keys);
                 else
-                    cache.localLoad(keys, plc);
+                    cache.localLoad(keys, plc, keepBinary);
             }
             finally {
                 cache.context().gate().leave();
@@ -5812,6 +5913,58 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      *
      */
+    static class LoadKeysCallableV2<K, V> extends LoadKeysCallable<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private boolean keepBinary;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public LoadKeysCallableV2() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param keys Keys.
+         * @param update If {@code true} calls {@link 
#localLoadAndUpdate(Collection)}
+         *        otherwise {@link #localLoad(Collection, ExpiryPolicy, 
boolean)}.
+         * @param plc Expiry policy.
+         * @param keepBinary Keep binary flag.
+         */
+        LoadKeysCallableV2(final String cacheName, final Collection<? extends 
K> keys, final boolean update,
+            final ExpiryPolicy plc, final boolean keepBinary) {
+            super(cacheName, keys, update, plc);
+
+            this.keepBinary = keepBinary;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            return call0(keepBinary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(final ObjectOutput out) throws 
IOException {
+            super.writeExternal(out);
+
+            out.writeBoolean(keepBinary);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(final ObjectInput in) throws 
IOException, ClassNotFoundException {
+            super.readExternal(in);
+
+            keepBinary = in.readBoolean();
+        }
+    }
+
+    /**
+     *
+     */
     private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, 
GridCacheVersion> {
         /** */
         final IgniteBiPredicate<K, V> p;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5ff674f..9f15bf4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -442,10 +442,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoad(Collection<? extends K> keys, final 
ExpiryPolicy plc)
+    @Override public void localLoad(Collection<? extends K> keys, final 
ExpiryPolicy plc, final boolean keepBinary)
         throws IgniteCheckedException {
         if (ctx.store().isLocal()) {
-            super.localLoad(keys, plc);
+            super.localLoad(keys, plc, keepBinary);
 
             return;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd6a67f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 7971173..dd66a33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -273,8 +273,8 @@ public abstract class GridNearCacheAdapter<K, V> extends 
GridDistributedCacheAda
     }
 
     /** {@inheritDoc} */
-    @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy 
plc) throws IgniteCheckedException {
-        dht().localLoad(keys, plc);
+    @Override public void localLoad(Collection<? extends K> keys, ExpiryPolicy 
plc, boolean keepBinary) throws IgniteCheckedException {
+        dht().localLoad(keys, plc, keepBinary);
     }
 
     /** {@inheritDoc} */

Reply via email to