ignite-1607 WIP

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f880227a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f880227a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f880227a

Branch: refs/heads/ignite-1607
Commit: f880227a97beccb7e6d7e456ce61f189bf800299
Parents: 177ae71
Author: sboikov <[email protected]>
Authored: Fri Oct 16 10:39:08 2015 +0300
Committer: sboikov <[email protected]>
Committed: Fri Oct 16 12:44:06 2015 +0300

----------------------------------------------------------------------
 .../distributed/GridDistributedCacheEntry.java  |   6 +-
 .../dht/CacheDistributedGetFutureAdapter.java   |  70 ++-------
 .../distributed/dht/GridDhtCacheEntry.java      |   7 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  46 ++++--
 .../dht/GridPartitionedGetFuture.java           |  54 +++----
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  39 ++---
 .../distributed/near/GridNearCacheAdapter.java  |   2 +-
 .../distributed/near/GridNearGetFuture.java     |  32 ++--
 ...arOptimisticSerializableTxPrepareFuture.java |  10 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  33 +---
 .../near/GridNearTransactionalCache.java        |   7 +-
 .../cache/distributed/near/GridNearTxLocal.java |  68 ++++++---
 .../transactions/IgniteTxLocalAdapter.java      |  35 ++---
 .../cache/transactions/IgniteTxLocalEx.java     |   2 -
 .../CacheSerializableTransactionsTest.java      | 149 +++++++++++++++++++
 16 files changed, 342 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
index d564768..89045ab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java
@@ -754,8 +754,8 @@ public class GridDistributedCacheEntry extends 
GridCacheMapEntry {
                 tx.xidVersion(),
                 tx.topologyVersion(),
                 timeout,
-                false,
-                true,
+                /*reenter*/false,
+                /*tx*/true,
                 tx.implicitSingle()) != null;
 
         try {
@@ -765,7 +765,7 @@ public class GridDistributedCacheEntry extends 
GridCacheMapEntry {
                 tx.threadId(),
                 tx.xidVersion(),
                 tx.timeout(),
-                true,
+                /*tx*/true,
                 tx.implicitSingle(),
                 tx.ownedVersion(txKey())
             );

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 4989a50..fc04126 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -27,13 +27,11 @@ import 
org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -95,7 +93,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> 
extends GridCompoun
     protected final boolean needVer;
 
     /** */
-    protected final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> 
resC;
+    protected final boolean keepCacheObjects;
 
     /**
      * @param cctx Context.
@@ -110,8 +108,8 @@ public abstract class CacheDistributedGetFutureAdapter<K, 
V> extends GridCompoun
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
      * @param canRemap Flag indicating whether future can be remapped on a 
newer topology version.
-     * @param resC Closure applied on 'get' result.
-     * @param needVer If {@code true} need provide entry version to result 
closure.
+     * @param needVer If {@code true} returns values as tuples containing 
value and version.
+     * @param keepCacheObjects Keep cache objects flag.
      */
     protected CacheDistributedGetFutureAdapter(
         GridCacheContext<K, V> cctx,
@@ -126,13 +124,11 @@ public abstract class CacheDistributedGetFutureAdapter<K, 
V> extends GridCompoun
         boolean skipVals,
         boolean canRemap,
         boolean needVer,
-        @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
+        boolean keepCacheObjects
     ) {
-        super(cctx.kernalContext(),
-            resC != null ? new ResultClosureReducer<K, V>(keys.size()) : 
CU.<K, V>mapsReducer(keys.size()));
+        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
 
         assert !F.isEmpty(keys);
-        assert !needVer || resC != null;
 
         this.cctx = cctx;
         this.keys = keys;
@@ -146,63 +142,23 @@ public abstract class CacheDistributedGetFutureAdapter<K, 
V> extends GridCompoun
         this.skipVals = skipVals;
         this.canRemap = canRemap;
         this.needVer = needVer;
-        this.resC = resC;
+        this.keepCacheObjects = keepCacheObjects;
 
         futId = IgniteUuid.randomUuid();
     }
 
     /**
+     * @param map Result map.
      * @param key Key.
      * @param val Value.
      * @param ver Version.
      */
     @SuppressWarnings("unchecked")
-    protected final void resultClosureValue(KeyCacheObject key, Object val, 
GridCacheVersion ver) {
-        assert resC != null;
-        assert val != null;
-        assert !needVer || ver != null;
+    protected final void versionedResult(Map map, KeyCacheObject key, Object 
val, GridCacheVersion ver) {
+        assert needVer;
+        assert skipVals || val != null;
+        assert ver != null;
 
-        ResultClosureReducer<K, V> rdc = (ResultClosureReducer)reducer();
-
-        assert rdc != null;
-
-        rdc.collect(key);
-
-        resC.apply(key, skipVals ? true : val, ver);
-    }
-
-    /**
-     *
-     */
-    private static class ResultClosureReducer<K, V> implements 
IgniteReducer<Map<K, V>, Map<K, V>>  {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final ConcurrentHashMap8<KeyCacheObject, Boolean> map;
-
-        /**
-         * @param keys Number of keys.
-         */
-        public ResultClosureReducer(int keys) {
-            this.map = new ConcurrentHashMap8<>(keys);
-        }
-
-        /**
-         * @param key Key.
-         */
-        void collect(KeyCacheObject key) {
-            map.put(key, Boolean.TRUE);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean collect(@Nullable Map<K, V> map) {
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Map<K, V> reduce() {
-            return (Map)map;
-        }
+        map.put(key, new T2<>(skipVals ? true : val, ver));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index c483e01..5d125ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -204,13 +204,6 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
 
             checkObsolete();
 
-            if (serReadVer != null) {
-                unswap(false);
-
-                if (!checkSerializableReadVersion(serReadVer))
-                    return null;
-            }
-
             GridCacheMvcc mvcc = mvccExtras();
 
             if (mvcc == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index f25ee33..1e1a6b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -818,12 +818,19 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
         this.writes = writes;
         this.txNodes = txNodes;
 
-        if (!F.isEmpty(writes)) {
+        boolean ser = tx.optimistic() && tx.serializable();
+
+        if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) {
             Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
 
             for (IgniteTxEntry entry : writes)
                 forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
 
+            if (ser) {
+                for (IgniteTxEntry entry : reads)
+                    forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+            }
+
             forceKeysFut = forceRebalanceKeys(forceKeys);
         }
 
@@ -852,7 +859,10 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
         IgniteTxEntry e,
         Map<Integer, Collection<KeyCacheObject>> map
     ) {
-        if (retVal || !F.isEmpty(e.entryProcessors()) || 
!F.isEmpty(e.filters())) {
+        if (retVal ||
+            !F.isEmpty(e.entryProcessors()) ||
+            !F.isEmpty(e.filters()) ||
+            e.serializableReadVersion() != null) {
             if (map == null)
                 map = new HashMap<>();
 
@@ -914,18 +924,32 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
      * @param entries Entries.
      * @return Not null exception if version check failed.
      */
-    @Nullable private IgniteTxOptimisticCheckedException 
checkReadConflict(Collection<IgniteTxEntry> entries) {
-        for (IgniteTxEntry entry : entries) {
-            GridCacheVersion serReadVer = entry.serializableReadVersion();
+    @Nullable private IgniteCheckedException 
checkReadConflict(Collection<IgniteTxEntry> entries) {
+        try {
+            for (IgniteTxEntry entry : entries) {
+                GridCacheVersion serReadVer = entry.serializableReadVersion();
 
-            if (serReadVer != null && 
!entry.cached().checkSerializableReadVersion(serReadVer)) {
-                GridCacheContext cctx = entry.context();
+                if (serReadVer != null) {
+                    entry.cached().unswap();
 
-                return new IgniteTxOptimisticCheckedException("Failed to 
prepare transaction, " +
-                    "read conflict [key=" + 
entry.key().value(cctx.cacheObjectContext(), false) +
-                    ", cache=" + cctx.name() + ']');
+                    if 
(!entry.cached().checkSerializableReadVersion(serReadVer)) {
+                        GridCacheContext cctx = entry.context();
+
+                        return new IgniteTxOptimisticCheckedException("Failed 
to prepare transaction, " +
+                            "read/write conflict [key=" + 
entry.key().value(cctx.cacheObjectContext(), false) +
+                            ", cache=" + cctx.name() + ']');
+                    }
+                }
             }
         }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to unswap entry: " + e, e);
+
+            return e;
+        }
+        catch (GridCacheEntryRemovedException e) {
+            assert false : "Got removed exception on entry with dht local 
candidate: " + entries;
+        }
 
         return null;
     }
@@ -936,7 +960,7 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
     private void prepare0() {
         try {
             if (tx.optimistic() && tx.serializable()) {
-                IgniteTxOptimisticCheckedException err0 = 
checkReadConflict(writes);
+                IgniteCheckedException err0 = checkReadConflict(writes);
 
                 if (err0 == null)
                     err0 = checkReadConflict(reads);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 18c6d69..efa2b1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -37,7 +37,6 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import 
org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -48,7 +47,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLeanMap;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -96,8 +94,8 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
      * @param canRemap Flag indicating whether future can be remapped on a 
newer topology version.
-     * @param needVer If {@code true} need provide entry version to result 
closure.
-     * @param resC Closure applied on 'get' result.
+     * @param needVer If {@code true} returns values as tuples containing 
value and version.
+     * @param keepCacheObjects Keep cache objects flag.
      */
     public GridPartitionedGetFuture(
         GridCacheContext<K, V> cctx,
@@ -113,7 +111,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         boolean skipVals,
         boolean canRemap,
         boolean needVer,
-        @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
+        boolean keepCacheObjects
     ) {
         super(cctx,
             keys,
@@ -127,7 +125,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
             skipVals,
             canRemap,
             needVer,
-            resC);
+            keepCacheObjects);
 
         this.topVer = topVer;
 
@@ -264,7 +262,7 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
 
         final int keysSize = keys.size();
 
-        Map<K, V> locVals = resC == null ? U.<K, V>newHashMap(keysSize) : null;
+        Map<K, V> locVals = U.newHashMap(keysSize);
 
         boolean hasRmtNodes = false;
 
@@ -454,10 +452,16 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
                                     colocated.removeIfObsolete(key);
                             }
                             else {
-                                if (resC != null)
-                                    resultClosureValue(key, skipVals ? true : 
v, ver);
+                                if (needVer)
+                                    versionedResult(locVals, key, v, ver);
                                 else
-                                    cctx.addResult(locVals, key, v, skipVals, 
false, deserializePortable, true);
+                                    cctx.addResult(locVals,
+                                        key,
+                                        v,
+                                        skipVals,
+                                        keepCacheObjects,
+                                        deserializePortable,
+                                        true);
 
                                 return false;
                             }
@@ -550,24 +554,24 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         int keysSize = infos.size();
 
         if (keysSize != 0) {
-            if (resC != null) {
-                for (GridCacheEntryInfo info : infos) {
-                    assert skipVals == (info.value() == null);
+            Map<K, V> map = new GridLeanMap<>(keysSize);
 
-                    resultClosureValue(info.key(), skipVals ? true : 
info.value(), info.version());
-                }
-            }
-            else {
-                Map<K, V> map = new GridLeanMap<>(keysSize);
-
-                for (GridCacheEntryInfo info : infos) {
-                    assert skipVals == (info.value() == null);
+            for (GridCacheEntryInfo info : infos) {
+                assert skipVals == (info.value() == null);
 
-                    cctx.addResult(map, info.key(), info.value(), skipVals, 
false, deserializePortable, false);
-                }
-
-                return map;
+                if (needVer)
+                    versionedResult(map, info.key(), info.value(), 
info.version());
+                else
+                    cctx.addResult(map,
+                        info.key(),
+                        info.value(),
+                        skipVals,
+                        keepCacheObjects,
+                        deserializePortable,
+                        false);
             }
+
+            return map;
         }
 
         return Collections.emptyMap();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d9840ec..8494410 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1040,7 +1040,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             skipVals,
             canRemap,
             false,
-            null);
+            false);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 241cc07..e8aca71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -290,7 +290,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             skipVals,
             canRemap,
             false,
-            null);
+            false);
     }
 
     /**
@@ -319,7 +319,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         boolean skipVals,
         boolean canRemap,
         boolean needVer,
-        @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
+        boolean keepCacheObject
     ) {
         if (keys == null || keys.isEmpty())
             return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -330,7 +330,6 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
         if (!reload && !forcePrimary) {
             Map<K, V> locVals = null;
-            Map<KeyCacheObject, T2<Object, GridCacheVersion>> locVals0 = null;
 
             boolean success = true;
 
@@ -391,18 +390,19 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
                                 success = false;
                             }
                             else {
-                                if (c != null) {
-                                    if (locVals0 == null)
-                                        locVals0 = U.newHashMap(keys.size());
-
-                                    locVals0.put(key, new 
T2<>((Object)(skipVals ? true : v), ver));
-                                }
-                                else {
-                                    if (locVals == null)
-                                        locVals = U.newHashMap(keys.size());
-
-                                    ctx.addResult(locVals, key, v, skipVals, 
false, deserializePortable, true);
-                                }
+                                if (locVals == null)
+                                    locVals = U.newHashMap(keys.size());
+
+                                if (needVer)
+                                    locVals.put((K)key, (V)new 
T2<>((Object)(skipVals ? true : v), ver));
+                                else
+                                    ctx.addResult(locVals,
+                                        key,
+                                        v,
+                                        skipVals,
+                                        keepCacheObject,
+                                        deserializePortable,
+                                        true);
                             }
                         }
                         else
@@ -436,13 +436,6 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             if (success) {
                 sendTtlUpdateRequest(expiryPlc);
 
-                if (c != null) {
-                    if (locVals0 != null) {
-                        for (Map.Entry<KeyCacheObject, T2<Object, 
GridCacheVersion>> e : locVals0.entrySet())
-                            c.apply(e.getKey(), e.getValue().get1(), 
e.getValue().get2());
-                    }
-                }
-
                 return new GridFinishedFuture<>(locVals);
             }
         }
@@ -465,7 +458,7 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             skipVals,
             canRemap,
             needVer,
-            c);
+            keepCacheObject);
 
         fut.init();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index d1adf1d..aec751e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -290,7 +290,7 @@ public abstract class GridNearCacheAdapter<K, V> extends 
GridDistributedCacheAda
             skipVal,
             canRemap,
             false,
-            null);
+            false);
 
         // init() will register future for responses if future has remote 
mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 43cc92a..61e09ad 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -101,8 +101,8 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
      * @param canRemap Flag indicating whether future can be remapped on a 
newer topology version.
-     * @param needVer If {@code true} need provide entry version to result 
closure.
-     * @param resC Closure applied on 'get' result.
+     * @param needVer If {@code true} returns values as tuples containing 
value and version.
+     * @param keepCacheObjects Keep cache objects flag.
      */
     public GridNearGetFuture(
         GridCacheContext<K, V> cctx,
@@ -118,7 +118,7 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
         boolean skipVals,
         boolean canRemap,
         boolean needVer,
-        @Nullable GridInClosure3<KeyCacheObject, Object, GridCacheVersion> resC
+        boolean keepCacheObjects
     ) {
         super(cctx,
             keys,
@@ -132,10 +132,9 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
             skipVals,
             canRemap,
             needVer,
-            resC);
+            keepCacheObjects);
 
         assert !F.isEmpty(keys);
-        assert !needVer || resC != null;
 
         this.tx = tx;
 
@@ -530,7 +529,12 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
                 }
 
                 if (v != null && !reload) {
-                    if (resC == null) {
+                    if (needVer) {
+                        V val0 = (V)new T2<>(skipVals ? true : v, ver);
+
+                        add(new 
GridFinishedFuture<>(Collections.singletonMap((K)key, val0)));
+                    }
+                    else {
                         K key0 = key.value(cctx.cacheObjectContext(), true);
                         V val0 = v.value(cctx.cacheObjectContext(), true);
 
@@ -539,8 +543,6 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
 
                         add(new 
GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
                     }
-                    else
-                        resultClosureValue(key, v, ver);
                 }
                 else {
                     if (affNode == null) {
@@ -661,7 +663,7 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
     ) {
         boolean empty = F.isEmpty(keys);
 
-        Map<K, V> map = (resC != null || empty) ? Collections.<K, V>emptyMap() 
: new GridLeanMap<K, V>(keys.size());
+        Map<K, V> map = empty ? Collections.<K, V>emptyMap() : new 
GridLeanMap<K, V>(keys.size());
 
         if (!empty) {
             boolean atomic = cctx.atomic();
@@ -699,10 +701,16 @@ public final class GridNearGetFuture<K, V> extends 
CacheDistributedGetFutureAdap
 
                     assert skipVals == (info.value() == null);
 
-                    if (resC != null)
-                        resultClosureValue(key, skipVals ? true : val, 
info.version());
+                    if (needVer)
+                        versionedResult(map, key, val, info.version());
                     else
-                        cctx.addResult(map, key, val, skipVals, false, 
deserializePortable, false);
+                        cctx.addResult(map,
+                            key,
+                            val,
+                            skipVals,
+                            keepCacheObjects,
+                            deserializePortable,
+                            false);
                 }
                 catch (GridCacheEntryRemovedException ignore) {
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 36eef52..6836a81 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -78,6 +79,9 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 public class GridNearOptimisticSerializableTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter
     implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
+    public static final IgniteProductVersion SER_TX_SINCE = 
IgniteProductVersion.fromString("1.5.0");
+
+    /** */
     @GridToStringExclude
     private KeyLockFuture keyLockFut = new KeyLockFuture();
 
@@ -104,7 +108,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearTxPre
         if (log.isDebugEnabled())
             log.debug("Transaction future received owner changed callback: " + 
entry);
 
-        if ((entry.context().isNear() || entry.context().isLocal()) && owner 
!= null && tx.hasWriteKey(entry.txKey())) {
+        if ((entry.context().isNear() || entry.context().isLocal())
+            && owner != null &&
+            tx.entry(entry.txKey()) != null) {
             keyLockFut.onKeyLocked(entry.txKey());
 
             return true;
@@ -477,7 +483,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearTxPre
             map(write, topVer, mappings, true, remap);
 
         for (IgniteTxEntry read : reads)
-            map(read, topVer, mappings, false, remap);
+            map(read, topVer, mappings, true, remap);
 
         keyLockFut.onAllKeysAdded();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 4dc8a84..9cd2478 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -450,10 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
                 return;
             }
 
-            prepare(
-                Collections.<IgniteTxEntry>emptyList(),
-                tx.writeEntries(),
-                topLocked);
+            prepare(tx.writeEntries(), topLocked);
 
             markInitialized();
         }
@@ -466,13 +462,11 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     }
 
     /**
-     * @param reads Read entries.
      * @param writes Write entries.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      * @throws IgniteCheckedException If failed.
      */
     private void prepare(
-        Iterable<IgniteTxEntry> reads,
         Iterable<IgniteTxEntry> writes,
         boolean topLocked
     ) throws IgniteCheckedException {
@@ -484,7 +478,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
 
         ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new 
ConcurrentLinkedDeque8<>();
 
-        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+        if (!F.isEmpty(writes)) {
             for (int cacheId : tx.activeCacheIds()) {
                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
@@ -500,25 +494,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         // Assign keys to primary nodes.
         GridDistributedTxMapping cur = null;
 
-        for (IgniteTxEntry read : reads) {
-            GridDistributedTxMapping updated = map(read, topVer, cur, false, 
topLocked);
-
-            if (cur != updated) {
-                mappings.offer(updated);
-
-                if (updated.node().isLocal()) {
-                    if (read.context().isNear())
-                        tx.nearLocallyMapped(true);
-                    else if (read.context().isColocated())
-                        tx.colocatedLocallyMapped(true);
-                }
-
-                cur = updated;
-            }
-        }
-
         for (IgniteTxEntry write : writes) {
-            GridDistributedTxMapping updated = map(write, topVer, cur, true, 
topLocked);
+            GridDistributedTxMapping updated = map(write, topVer, cur, 
topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -650,7 +627,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param cur Current mapping.
-     * @param waitLock Wait lock flag.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      * @return Mapping.
      */
@@ -658,7 +634,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         @Nullable GridDistributedTxMapping cur,
-        boolean waitLock,
         boolean topLocked
     ) {
         GridCacheContext cacheCtx = entry.context();
@@ -686,7 +661,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
         if (cacheCtx.isNear() || cacheCtx.isLocal()) {
-            if (waitLock && entry.explicitVersion() == null)
+            if (entry.explicitVersion() == null)
                 lockKeys.add(entry.txKey());
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 7700f05..909b547 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -181,8 +181,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
         boolean skipVals,
-        boolean needVer,
-        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+        boolean needVer) {
         assert tx != null;
 
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
@@ -196,9 +195,9 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
             deserializePortable,
             expiryPlc,
             skipVals,
-            needVer,
             /*can remap*/true,
-            c);
+            needVer,
+            true);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 5b2d50c..ad5c9d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -60,10 +60,10 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -348,30 +348,23 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
         boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
-        boolean needVer,
+        final boolean needVer,
         final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
     ) {
         if (cacheCtx.isNear()) {
             return cacheCtx.nearTx().txLoadAsync(this,
                 keys,
                 readThrough,
-                deserializePortable,
+                /*deserializePortable*/false,
                 accessPolicy(cacheCtx, keys),
                 skipVals,
-                needVer,
-                c).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, 
Void>() {
+                needVer).chain(new C1<IgniteInternalFuture<Map<Object, 
Object>>, Void>() {
                 @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
                     try {
                         Map<Object, Object> map = f.get();
 
-                        if (map != null && map.size() != keys.size()) {
-                            for (KeyCacheObject key : keys) {
-                                if (!map.containsKey(key))
-                                    c.apply(key, null, 
IgniteTxEntry.READ_NEW_ENTRY_VER);
-                            }
-                        }
+                        processLoaded(map, keys, needVer, c);
 
                         return null;
                     }
@@ -392,23 +385,18 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
                 topologyVersion(),
                 CU.subjectId(this, cctx),
                 resolveTaskName(),
-                deserializePortable,
+                /*deserializePortable*/false,
                 accessPolicy(cacheCtx, keys),
                 skipVals,
                 /*can remap*/true,
                 needVer,
-                c
+                /*keepCacheObject*/true
             ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
                 @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
                     try {
                         Map<Object, Object> map = f.get();
 
-                        if (map != null && map.size() != keys.size()) {
-                            for (KeyCacheObject key : keys) {
-                                if (!map.containsKey(key))
-                                    c.apply(key, null, 
IgniteTxEntry.READ_NEW_ENTRY_VER);
-                            }
-                        }
+                        processLoaded(map, keys, needVer, c);
 
                         return null;
                     }
@@ -422,7 +410,43 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
{
         } else {
             assert cacheCtx.isLocal();
 
-            return super.loadMissing(cacheCtx, readThrough, async, keys, 
deserializePortable, skipVals, needVer, c);
+            return super.loadMissing(cacheCtx, readThrough, async, keys, 
skipVals, needVer, c);
+        }
+    }
+
+    /**
+     * @param map Loaded values.
+     * @param keys Keys.
+     * @param needVer If {@code true} version is required for loaded values.
+     * @param c Closure.
+     */
+    private void processLoaded(
+        Map<Object, Object> map,
+        final Collection<KeyCacheObject> keys,
+        boolean needVer,
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+        for (KeyCacheObject key : keys) {
+            Object val = map.get(key);
+
+            if (val != null) {
+                Object v;
+                GridCacheVersion ver;
+
+                if (needVer) {
+                    T2<Object, GridCacheVersion> t = (T2)val;
+
+                    v = t.get1();
+                    ver = t.get2();
+                }
+                else {
+                    v = val;
+                    ver = null;
+                }
+
+                c.apply(key, v, ver);
+            }
+            else
+                c.apply(key, null, IgniteTxEntry.READ_NEW_ENTRY_VER);
         }
     }
 
@@ -632,7 +656,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers)
     {
-        Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), 
mapping.writes());
+        Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), 
mapping.reads());
 
         for (IgniteTxEntry txEntry : entries) {
             while (true) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 51a3316..ccf7394 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -421,7 +421,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         final boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
         boolean needVer,
         final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
@@ -1621,7 +1620,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 !skipStore,
                 false,
                 missedMap.keySet(),
-                deserializePortable,
                 skipVals,
                 needReadVer,
                 new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() 
{
@@ -1658,15 +1656,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                             cacheCtx.evicts().touch(e, topologyVersion());
 
                             if (visibleVal != null) {
-                                synchronized (map) {
-                                    cacheCtx.addResult(map,
-                                        key,
-                                        visibleVal,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializePortable,
-                                        false);
-                                }
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                         }
                         else {
@@ -1681,15 +1677,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                             }
 
                             if (visibleVal != null) {
-                                synchronized (map) {
-                                    cacheCtx.addResult(map,
-                                        key,
-                                        visibleVal,
-                                        skipVals,
-                                        keepCacheObjects,
-                                        deserializePortable,
-                                        false);
-                                }
+                                cacheCtx.addResult(map,
+                                    key,
+                                    visibleVal,
+                                    skipVals,
+                                    keepCacheObjects,
+                                    deserializePortable,
+                                    false);
                             }
                         }
                     }
@@ -2367,7 +2361,6 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 /*read through*/cacheCtx.config().isLoadPreviousValue() && 
!skipStore,
                 /*async*/true,
                 missedForLoad,
-                deserializePortables(cacheCtx),
                 skipVals,
                 needReadVer,
                 new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() 
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 1530aeb..bdea971 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -156,7 +156,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
      * @param readThrough Read through flag.
      * @param async if {@code True}, then loading will happen in a separate 
thread.
      * @param keys Keys.
-     * @param deserializePortable Deserialize portable flag.
      * @param skipVals Skip values flag.
      * @param needVer If {@code true} version is required for loaded values.
      * @param c Closure to be applied for loaded values.
@@ -167,7 +166,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
         boolean readThrough,
         boolean async,
         Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
         boolean needVer,
         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f880227a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 0c9debf..4f6317d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -70,6 +71,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.testframework.GridTestUtils.TestMemoryMode;
 import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
@@ -3037,6 +3039,153 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testNoOptimisticExceptionChangingTopology() throws Exception {
+        if (FAST)
+            return;
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final List<String> cacheNames = new ArrayList<>();
+
+        Ignite srv = ignite(1);
+
+        try {
+            {
+                CacheConfiguration<Integer, Integer> ccfg = 
cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
+                ccfg.setName("cache1");
+                ccfg.setRebalanceMode(SYNC);
+
+                srv.createCache(ccfg);
+
+                cacheNames.add(ccfg.getName());
+            }
+
+            {
+                // Store enabled.
+                CacheConfiguration<Integer, Integer> ccfg = 
cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false);
+                ccfg.setName("cache2");
+                ccfg.setRebalanceMode(SYNC);
+
+                srv.createCache(ccfg);
+
+                cacheNames.add(ccfg.getName());
+            }
+
+            {
+                // Offheap.
+                CacheConfiguration<Integer, Integer> ccfg = 
cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false);
+                ccfg.setName("cache3");
+                ccfg.setRebalanceMode(SYNC);
+
+                GridTestUtils.setMemoryMode(null, ccfg, 
TestMemoryMode.OFFHEAP_TIERED, 1, 64);
+
+                srv.createCache(ccfg);
+
+                cacheNames.add(ccfg.getName());
+            }
+
+            IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new 
Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    while (!finished.get()) {
+                        stopGrid(0);
+
+                        U.sleep(300);
+
+                        Ignite ignite = startGrid(0);
+
+                        assertFalse(ignite.configuration().isClientMode());
+                    }
+
+                    return null;
+                }
+            });
+
+            List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+            final int KEYS_PER_THREAD = 100;
+
+            for (int i = 1; i < SRVS + CLIENTS; i++) {
+                final Ignite node = ignite(i);
+
+                final int minKey = i * KEYS_PER_THREAD;
+                final int maxKey = minKey + KEYS_PER_THREAD;
+
+                // Threads update non-intersecting keys, optimistic exception 
should not be thrown.
+
+                futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        try {
+                            log.info("Started update thread [node=" + 
node.name() +
+                                ", minKey=" + minKey +
+                                ", maxKey=" + maxKey + ']');
+
+                            final ThreadLocalRandom rnd = 
ThreadLocalRandom.current();
+
+                            List<IgniteCache<Integer, Integer>> caches = new 
ArrayList<>();
+
+                            for (String cacheName : cacheNames)
+                                caches.add(node.<Integer, 
Integer>cache(cacheName));
+
+                            assertEquals(3, caches.size());
+
+                            int iter = 0;
+
+                            while (!finished.get()) {
+                                int keyCnt = rnd.nextInt(1, 10);
+
+                                final Set<Integer> keys = new 
LinkedHashSet<>();
+
+                                while (keys.size() < keyCnt)
+                                    keys.add(rnd.nextInt(minKey, maxKey));
+
+                                for (final IgniteCache<Integer, Integer> cache 
: caches) {
+                                    doInTransaction(node, OPTIMISTIC, 
SERIALIZABLE, new Callable<Void>() {
+                                        @Override public Void call() throws 
Exception {
+                                            for (Integer key : keys)
+                                                randomOperation(rnd, cache, 
key);
+
+                                            return null;
+                                        }
+                                    });
+                                }
+
+                                if (iter % 100 == 0)
+                                    log.info("Iteration: " + iter);
+
+                                iter++;
+                            }
+
+                            return null;
+                        }
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
+
+                            throw e;
+                        }
+                    }
+                }, "update-thread-" + i));
+            }
+
+            U.sleep(60_000);
+
+            finished.set(true);
+
+            restartFut.get();
+
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.get();
+        }
+        finally {
+            for (String cacheName : cacheNames)
+                srv.destroyCache(cacheName);
+
+            finished.set(true);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConflictResolution() throws Exception {
         final Ignite ignite = ignite(0);
 

Reply via email to