http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6db00ab..af43113 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,17 +35,14 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
-import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -57,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.transactions.TransactionOptimisticException;
 import org.apache.ignite.transactions.TransactionTimeoutException;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -69,7 +64,7 @@ import static 
org.apache.ignite.transactions.TransactionState.PREPARING;
 /**
  *
  */
-public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAdapter
+public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepareFutureAdapter
     implements GridCacheMvccFuture<IgniteInternalTx> {
     /** */
     @GridToStringInclude
@@ -82,7 +77,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, 
GridNearTxLocal tx) {
         super(cctx, tx);
 
-        assert tx.optimistic() : tx;
+        assert tx.optimistic() && !tx.serializable() : tx;
     }
 
     /** {@inheritDoc} */
@@ -139,11 +134,9 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     }
 
     /**
-     * @param nodeId Failed node ID.
-     * @param mappings Remaining mappings.
      * @param e Error.
      */
-    void onError(@Nullable UUID nodeId, @Nullable 
Iterable<GridDistributedTxMapping> mappings, Throwable e) {
+    void onError(Throwable e) {
         if (X.hasCause(e, ClusterTopologyCheckedException.class) || 
X.hasCause(e, ClusterTopologyException.class)) {
             if (tx.onePhaseCommit()) {
                 tx.markForBackupCheck();
@@ -157,12 +150,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         if (err.compareAndSet(null, e)) {
             boolean marked = tx.setRollbackOnly();
 
-            if (e instanceof IgniteTxOptimisticCheckedException) {
-                assert nodeId != null : "Missing node ID for optimistic 
failure exception: " + e;
-
-                tx.removeKeysMapping(nodeId, mappings);
-            }
-
             if (e instanceof IgniteTxRollbackCheckedException) {
                 if (marked) {
                     try {
@@ -199,7 +186,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) 
{
         if (!isDone()) {
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : 
pending()) {
                 if (isMini(fut)) {
                     MiniFuture f = (MiniFuture)fut;
 
@@ -253,212 +240,38 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepare() {
-        // Obtain the topology version to use.
-        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
-
-        // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
-        if (topVer == null && tx != null && tx.system()) {
-            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
-
-            if (tx0 != null)
-                topVer = tx0.topologyVersionSnapshot();
-        }
-
-        if (topVer != null) {
-            tx.topologyVersion(topVer);
-
-            cctx.mvcc().addFuture(this);
-
-            prepare0(false, true);
-
-            return;
-        }
-
-        prepareOnTopology(false, null);
-    }
-
-    /**
-     * @param remap Remap flag.
-     * @param c Optional closure to run after map.
-     */
-    private void prepareOnTopology(final boolean remap, @Nullable final 
Runnable c) {
-        GridDhtTopologyFuture topFut = topologyReadLock();
-
-        AffinityTopologyVersion topVer = null;
-
-        try {
-            if (topFut == null) {
-                assert isDone();
-
-                return;
-            }
-
-            if (topFut.isDone()) {
-                topVer = topFut.topologyVersion();
-
-                if (remap)
-                    tx.onRemap(topVer);
-                else
-                    tx.topologyVersion(topVer);
-
-                if (!remap)
-                    cctx.mvcc().addFuture(this);
-            }
-        }
-        finally {
-            topologyReadUnlock();
-        }
-
-        if (topVer != null) {
-            StringBuilder invalidCaches = null;
-
-            for (Integer cacheId : tx.activeCacheIds()) {
-                GridCacheContext ctx = cctx.cacheContext(cacheId);
-
-                assert ctx != null : cacheId;
-
-                Throwable err = topFut.validateCache(ctx);
-
-                if (err != null) {
-                    if (invalidCaches != null)
-                        invalidCaches.append(", ");
-                    else
-                        invalidCaches = new StringBuilder();
-
-                    invalidCaches.append(U.maskName(ctx.name()));
-                }
-            }
-
-            if (invalidCaches != null) {
-                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
-                    invalidCaches.toString()));
-
-                return;
-            }
-
-            prepare0(remap, false);
-
-            if (c != null)
-                c.run();
-        }
-        else {
-            topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
-                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
-                        @Override public void run() {
-                            try {
-                                fut.get();
-
-                                prepareOnTopology(remap, c);
-                            }
-                            catch (IgniteCheckedException e) {
-                                onDone(e);
-                            }
-                            finally {
-                                cctx.txContextReset();
-                            }
-                        }
-                    });
-                }
-            });
-        }
-    }
-
-    /**
-     * Acquires topology read lock.
-     *
-     * @return Topology ready future.
-     */
-    private GridDhtTopologyFuture topologyReadLock() {
-        if (tx.activeCacheIds().isEmpty())
-            return cctx.exchange().lastTopologyFuture();
-
-        GridCacheContext<?, ?> nonLocCtx = null;
-
-        for (int cacheId : tx.activeCacheIds()) {
-            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (!cacheCtx.isLocal()) {
-                nonLocCtx = cacheCtx;
-
-                break;
-            }
-        }
-
-        if (nonLocCtx == null)
-            return cctx.exchange().lastTopologyFuture();
-
-        nonLocCtx.topology().readLock();
-
-        if (nonLocCtx.topology().stopping()) {
-            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
-                nonLocCtx.name()));
-
-            return null;
-        }
-
-        return nonLocCtx.topology().topologyVersionFuture();
-    }
-
-    /**
-     * Releases topology read lock.
-     */
-    private void topologyReadUnlock() {
-        if (!tx.activeCacheIds().isEmpty()) {
-            GridCacheContext<?, ?> nonLocCtx = null;
-
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (!cacheCtx.isLocal()) {
-                    nonLocCtx = cacheCtx;
-
-                    break;
-                }
-            }
-
-            if (nonLocCtx != null)
-                nonLocCtx.topology().readUnlock();
-        }
-    }
-
     /**
      * Initializes future.
      *
      * @param remap Remap flag.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      */
-    private void prepare0(boolean remap, boolean topLocked) {
+    @Override protected void prepare0(boolean remap, boolean topLocked) {
         try {
             boolean txStateCheck = remap ? tx.state() == PREPARING : 
tx.state(PREPARING);
 
             if (!txStateCheck) {
                 if (tx.setRollbackOnly()) {
                     if (tx.timedOut())
-                        onError(null, null, new 
IgniteTxTimeoutCheckedException("Transaction timed out and " +
+                        onError(new 
IgniteTxTimeoutCheckedException("Transaction timed out and " +
                             "was rolled back: " + this));
                     else
-                        onError(null, null, new 
IgniteCheckedException("Invalid transaction state for prepare " +
+                        onError(new IgniteCheckedException("Invalid 
transaction state for prepare " +
                             "[state=" + tx.state() + ", tx=" + this + ']'));
                 }
                 else
-                    onError(null, null, new 
IgniteTxRollbackCheckedException("Invalid transaction state for " +
+                    onError(new IgniteTxRollbackCheckedException("Invalid 
transaction state for " +
                         "prepare [state=" + tx.state() + ", tx=" + this + 
']'));
 
                 return;
             }
 
-            prepare(
-                tx.optimistic() && tx.serializable() ? tx.readEntries() : 
Collections.<IgniteTxEntry>emptyList(),
-                tx.writeEntries(),
-                topLocked);
+            prepare(tx.writeEntries(), topLocked);
 
             markInitialized();
         }
-        catch (TransactionTimeoutException | TransactionOptimisticException e) 
{
-            onError(cctx.localNodeId(), null, e);
+        catch (TransactionTimeoutException e) {
+            onError( e);
         }
         catch (IgniteCheckedException e) {
             onDone(e);
@@ -466,13 +279,11 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     }
 
     /**
-     * @param reads Read entries.
      * @param writes Write entries.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      * @throws IgniteCheckedException If failed.
      */
     private void prepare(
-        Iterable<IgniteTxEntry> reads,
         Iterable<IgniteTxEntry> writes,
         boolean topLocked
     ) throws IgniteCheckedException {
@@ -484,7 +295,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
 
         ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new 
ConcurrentLinkedDeque8<>();
 
-        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
+        if (!F.isEmpty(writes)) {
             for (int cacheId : tx.activeCacheIds()) {
                 GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
@@ -500,25 +311,8 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         // Assign keys to primary nodes.
         GridDistributedTxMapping cur = null;
 
-        for (IgniteTxEntry read : reads) {
-            GridDistributedTxMapping updated = map(read, topVer, cur, false, 
topLocked);
-
-            if (cur != updated) {
-                mappings.offer(updated);
-
-                if (updated.node().isLocal()) {
-                    if (read.context().isNear())
-                        tx.nearLocallyMapped(true);
-                    else if (read.context().isColocated())
-                        tx.colocatedLocallyMapped(true);
-                }
-
-                cur = updated;
-            }
-        }
-
         for (IgniteTxEntry write : writes) {
-            GridDistributedTxMapping updated = map(write, topVer, cur, true, 
topLocked);
+            GridDistributedTxMapping updated = map(write, topVer, cur, 
topLocked);
 
             if (cur != updated) {
                 mappings.offer(updated);
@@ -576,7 +370,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
             futId,
             tx.topologyVersion(),
             tx,
-            tx.optimistic() && tx.serializable() ? m.reads() : null,
+            null,
             m.writes(),
             m.near(),
             txMapping.transactionNodes(),
@@ -604,7 +398,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
                 tx.userPrepare();
             }
             catch (IgniteCheckedException e) {
-                onError(null, null, e);
+                onError(e);
             }
         }
 
@@ -651,7 +445,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
      * @param entry Transaction entry.
      * @param topVer Topology version.
      * @param cur Current mapping.
-     * @param waitLock Wait lock flag.
      * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
      * @return Mapping.
      */
@@ -659,7 +452,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         IgniteTxEntry entry,
         AffinityTopologyVersion topVer,
         @Nullable GridDistributedTxMapping cur,
-        boolean waitLock,
         boolean topLocked
     ) {
         GridCacheContext cacheCtx = entry.context();
@@ -687,7 +479,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
             entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
 
         if (cacheCtx.isNear() || cacheCtx.isLocal()) {
-            if (waitLock && entry.explicitVersion() == null)
+            if (entry.explicitVersion() == null)
                 lockKeys.add(entry.txKey());
         }
 
@@ -749,7 +541,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class MiniFuture extends 
GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -828,7 +620,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
 
                 // Fail the whole future (make sure not to remap on different 
primary node
                 // to prevent multiple lock coordinators).
-                onError(null, null, e);
+                onError(e);
             }
         }
 
@@ -836,14 +628,14 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
          * @param nodeId Failed node ID.
          * @param res Result callback.
          */
-        void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
+        void onResult(UUID nodeId, final GridNearTxPrepareResponse res) {
             if (isDone())
                 return;
 
             if (rcvRes.compareAndSet(false, true)) {
                 if (res.error() != null) {
                     // Fail the whole compound future.
-                    onError(nodeId, mappings, res.error());
+                    onError(res.error());
                 }
                 else {
                     if (res.clientRemapVersion() != null) {
@@ -877,7 +669,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
                             proceedPrepare(mappings);
 
                         // Finish this mini future.
-                        onDone(tx);
+                        onDone((GridNearTxPrepareResponse)null);
                     }
                 }
             }
@@ -889,7 +681,7 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearTxPrepareFutureAd
         private void remap() {
             prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
-                    onDone(tx);
+                    onDone((GridNearTxPrepareResponse)null);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
new file mode 100644
index 0000000..fd9183e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridNearOptimisticTxPrepareFutureAdapter extends 
GridNearTxPrepareFutureAdapter {
+    /**
+     * @param cctx Context.
+     * @param tx Transaction.
+     */
+    public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext 
cctx, GridNearTxLocal tx) {
+        super(cctx, tx);
+
+        assert tx.optimistic() : tx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void prepare() {
+        // Obtain the topology version to use.
+        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+        // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
+        if (topVer == null && tx != null && tx.system()) {
+            IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+
+            if (tx0 != null)
+                topVer = tx0.topologyVersionSnapshot();
+        }
+
+        if (topVer != null) {
+            tx.topologyVersion(topVer);
+
+            cctx.mvcc().addFuture(this);
+
+            prepare0(false, true);
+
+            return;
+        }
+
+        prepareOnTopology(false, null);
+    }
+
+    /**
+     * Acquires topology read lock.
+     *
+     * @return Topology ready future.
+     */
+    protected final GridDhtTopologyFuture topologyReadLock() {
+        if (tx.activeCacheIds().isEmpty())
+            return cctx.exchange().lastTopologyFuture();
+
+        GridCacheContext<?, ?> nonLocCtx = null;
+
+        for (int cacheId : tx.activeCacheIds()) {
+            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+            if (!cacheCtx.isLocal()) {
+                nonLocCtx = cacheCtx;
+
+                break;
+            }
+        }
+
+        if (nonLocCtx == null)
+            return cctx.exchange().lastTopologyFuture();
+
+        nonLocCtx.topology().readLock();
+
+        if (nonLocCtx.topology().stopping()) {
+            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                nonLocCtx.name()));
+
+            return null;
+        }
+
+        return nonLocCtx.topology().topologyVersionFuture();
+    }
+
+    /**
+     * Releases topology read lock.
+     */
+    protected final void topologyReadUnlock() {
+        if (!tx.activeCacheIds().isEmpty()) {
+            GridCacheContext<?, ?> nonLocCtx = null;
+
+            for (int cacheId : tx.activeCacheIds()) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (!cacheCtx.isLocal()) {
+                    nonLocCtx = cacheCtx;
+
+                    break;
+                }
+            }
+
+            if (nonLocCtx != null)
+                nonLocCtx.topology().readUnlock();
+        }
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param c Optional closure to run after map.
+     */
+    protected final void prepareOnTopology(final boolean remap, @Nullable 
final Runnable c) {
+        GridDhtTopologyFuture topFut = topologyReadLock();
+
+        AffinityTopologyVersion topVer = null;
+
+        try {
+            if (topFut == null) {
+                assert isDone();
+
+                return;
+            }
+
+            if (topFut.isDone()) {
+                topVer = topFut.topologyVersion();
+
+                if (remap)
+                    tx.onRemap(topVer);
+                else
+                    tx.topologyVersion(topVer);
+
+                if (!remap)
+                    cctx.mvcc().addFuture(this);
+            }
+        }
+        finally {
+            topologyReadUnlock();
+        }
+
+        if (topVer != null) {
+            StringBuilder invalidCaches = null;
+
+            for (Integer cacheId : tx.activeCacheIds()) {
+                GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+                assert ctx != null : cacheId;
+
+                Throwable err = topFut.validateCache(ctx);
+
+                if (err != null) {
+                    if (invalidCaches != null)
+                        invalidCaches.append(", ");
+                    else
+                        invalidCaches = new StringBuilder();
+
+                    invalidCaches.append(U.maskName(ctx.name()));
+                }
+            }
+
+            if (invalidCaches != null) {
+                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
+                    invalidCaches.toString()));
+
+                return;
+            }
+
+            prepare0(remap, false);
+
+            if (c != null)
+                c.run();
+        }
+        else {
+            topFut.listen(new 
CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                @Override public void apply(final 
IgniteInternalFuture<AffinityTopologyVersion> fut) {
+                    cctx.kernalContext().closure().runLocalSafe(new 
GridPlainRunnable() {
+                        @Override public void run() {
+                            try {
+                                fut.get();
+
+                                prepareOnTopology(remap, c);
+                            }
+                            catch (IgniteCheckedException e) {
+                                onDone(e);
+                            }
+                            finally {
+                                cctx.txContextReset();
+                            }
+                        }
+                    });
+                }
+            });
+        }
+    }
+
+    /**
+     * @param remap Remap flag.
+     * @param topLocked {@code True} if thread already acquired lock 
preventing topology change.
+     */
+    protected abstract void prepare0(boolean remap, boolean topLocked);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 62f9bb3..11d31b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -103,7 +103,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
         if (!isDone()) {
             assert res.clientRemapVersion() == null : res;
 
-            for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
+            for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : 
pending()) {
                 MiniFuture f = (MiniFuture)fut;
 
                 if (f.futureId().equals(res.miniId())) {
@@ -292,7 +292,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
     /**
      *
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class MiniFuture extends 
GridFutureAdapter<GridNearTxPrepareResponse> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -332,7 +332,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
             else {
                 onPrepareResponse(m, res);
 
-                onDone(tx);
+                onDone(res);
             }
         }
 
@@ -344,7 +344,7 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
                 tx.markForBackupCheck();
 
                 // Do not fail future for one-phase transaction right away.
-                onDone(tx);
+                onDone((GridNearTxPrepareResponse)null);
             }
 
             onError(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index c3bb324..0e8aa0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -117,7 +117,6 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         @Nullable final Collection<? extends K> keys,
         boolean forcePrimary,
         boolean skipTx,
-        @Nullable final GridCacheEntryEx entry,
         @Nullable UUID subjId,
         String taskName,
         final boolean deserializePortable,
@@ -143,7 +142,6 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
                 @Override public IgniteInternalFuture<Map<K, V>> 
op(IgniteTxLocalAdapter tx) {
                     return tx.getAllAsync(ctx,
                         ctx.cacheKeysView(keys),
-                        entry,
                         deserializePortable,
                         skipVals,
                         false,
@@ -156,7 +154,6 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
 
         return loadAsync(null,
             ctx.cacheKeysView(keys),
-            false,
             forcePrimary,
             subjId,
             taskName,
@@ -174,6 +171,7 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
+     * @param needVer If {@code true} returns values as tuples containing 
value and version.
      * @return Future.
      */
     IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
@@ -181,21 +179,23 @@ public class GridNearTransactionalCache<K, V> extends 
GridNearCacheAdapter<K, V>
         boolean readThrough,
         boolean deserializePortable,
         @Nullable IgniteCacheExpiryPolicy expiryPlc,
-        boolean skipVals) {
+        boolean skipVals,
+        boolean needVer) {
         assert tx != null;
 
         GridNearGetFuture<K, V> fut = new GridNearGetFuture<>(ctx,
             keys,
             readThrough,
-            false,
-            false,
+            /*force primary*/needVer,
             tx,
             CU.subjectId(tx, ctx.shared()),
             tx.resolveTaskName(),
             deserializePortable,
             expiryPlc,
             skipVals,
-            /*can remap*/true);
+            /*can remap*/true,
+            needVer,
+            /*keepCacheObjects*/true);
 
         // init() will register future for responses if it has remote mappings.
         fut.init();

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1a4f130..46c9f3e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -266,9 +266,11 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
             }
 
             if (tx.onePhaseCommit()) {
-                finishOnePhase();
+                boolean commit = this.commit && err == null;
 
-                tx.tmFinish(commit && err == null);
+                finishOnePhase(commit);
+
+                tx.tmFinish(commit);
             }
 
             Throwable th = this.err.get();
@@ -510,9 +512,9 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
     }
 
     /**
-     *
+     * @param commit Commit flag.
      */
-    private void finishOnePhase() {
+    private void finishOnePhase(boolean commit) {
         // No need to send messages as transaction was already committed on 
remote node.
         // Finish local mapping only as we need send commit message to backups.
         for (GridDistributedTxMapping m : mappings.values()) {
@@ -522,6 +524,8 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
                 // Add new future.
                 if (fut != null)
                     add(fut);
+
+                break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ea96649..883c285 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -55,14 +55,15 @@ import 
org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -342,32 +343,30 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> loadMissing(
+    @Override public IgniteInternalFuture<Void> loadMissing(
         final GridCacheContext cacheCtx,
         boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
-        boolean deserializePortable,
         boolean skipVals,
-        final IgniteBiInClosure<KeyCacheObject, Object> c
+        final boolean needVer,
+        final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
     ) {
         if (cacheCtx.isNear()) {
             return cacheCtx.nearTx().txLoadAsync(this,
                 keys,
                 readThrough,
-                deserializePortable,
+                /*deserializePortable*/false,
                 accessPolicy(cacheCtx, keys),
-                skipVals).chain(new C1<IgniteInternalFuture<Map<Object, 
Object>>, Boolean>() {
-                @Override public Boolean 
apply(IgniteInternalFuture<Map<Object, Object>> f) {
+                skipVals,
+                needVer).chain(new C1<IgniteInternalFuture<Map<Object, 
Object>>, Void>() {
+                @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
                     try {
                         Map<Object, Object> map = f.get();
 
-                        // Must loop through keys, not map entries,
-                        // as map entries may not have all the keys.
-                        for (KeyCacheObject key : keys)
-                            c.apply(key, 
map.get(key.value(cacheCtx.cacheObjectContext(), false)));
+                        processLoaded(map, keys, needVer, c);
 
-                        return true;
+                        return null;
                     }
                     catch (Exception e) {
                         setRollbackOnly();
@@ -381,39 +380,73 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
             return cacheCtx.colocated().loadAsync(
                 keys,
                 readThrough,
-                /*reload*/false,
-                /*force primary*/false,
+                /*force primary*/needVer,
                 topologyVersion(),
                 CU.subjectId(this, cctx),
                 resolveTaskName(),
-                deserializePortable,
+                /*deserializePortable*/false,
                 accessPolicy(cacheCtx, keys),
                 skipVals,
-                /*can remap*/true
-            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, 
Boolean>() {
-                    @Override public Boolean 
apply(IgniteInternalFuture<Map<Object, Object>> f) {
-                        try {
-                            Map<Object, Object> map = f.get();
-
-                            // Must loop through keys, not map entries,
-                            // as map entries may not have all the keys.
-                            for (KeyCacheObject key : keys)
-                                c.apply(key, 
map.get(key.value(cacheCtx.cacheObjectContext(), false)));
-
-                            return true;
-                        }
-                        catch (Exception e) {
-                            setRollbackOnly();
-
-                            throw new GridClosureException(e);
-                        }
+                /*can remap*/true,
+                needVer,
+                /*keepCacheObject*/true
+            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
+                @Override public Void apply(IgniteInternalFuture<Map<Object, 
Object>> f) {
+                    try {
+                        Map<Object, Object> map = f.get();
+
+                        processLoaded(map, keys, needVer, c);
+
+                        return null;
                     }
-                });
+                    catch (Exception e) {
+                        setRollbackOnly();
+
+                        throw new GridClosureException(e);
+                    }
+                }
+            });
         }
         else {
             assert cacheCtx.isLocal();
 
-            return super.loadMissing(cacheCtx, readThrough, async, keys, 
deserializePortable, skipVals, c);
+            return super.loadMissing(cacheCtx, readThrough, async, keys, 
skipVals, needVer, c);
+        }
+    }
+
+    /**
+     * @param map Loaded values.
+     * @param keys Keys.
+     * @param needVer If {@code true} version is required for loaded values.
+     * @param c Closure.
+     */
+    private void processLoaded(
+        Map<Object, Object> map,
+        final Collection<KeyCacheObject> keys,
+        boolean needVer,
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+        for (KeyCacheObject key : keys) {
+            Object val = map.get(key);
+
+            if (val != null) {
+                Object v;
+                GridCacheVersion ver;
+
+                if (needVer) {
+                    T2<Object, GridCacheVersion> t = (T2)val;
+
+                    v = t.get1();
+                    ver = t.get2();
+                }
+                else {
+                    v = val;
+                    ver = null;
+                }
+
+                c.apply(key, v, ver);
+            }
+            else
+                c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
         }
     }
 
@@ -555,36 +588,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
{
         }
     }
 
-
-    /**
-     * Removes mapping in case of optimistic tx failure on primary node.
-     *
-     * @param failedNodeId Failed node ID.
-     * @param mapQueue Mappings queue.
-     */
-    void removeKeysMapping(UUID failedNodeId, 
Iterable<GridDistributedTxMapping> mapQueue) {
-        assert failedNodeId != null;
-        assert mapQueue != null;
-
-        mappings.remove(failedNodeId);
-
-        if (!F.isEmpty(mapQueue)) {
-            for (GridDistributedTxMapping m : mapQueue) {
-                UUID nodeId = m.node().id();
-
-                GridDistributedTxMapping mapping = mappings.get(nodeId);
-
-                if (mapping != null) {
-                    for (IgniteTxEntry entry : m.entries())
-                        mapping.removeEntry(entry);
-
-                    if (mapping.entries().isEmpty())
-                        mappings.remove(nodeId);
-                }
-            }
-        }
-    }
-
     /**
      * @param nodeId Node ID to mark with explicit lock.
      * @return {@code True} if mapping was found.
@@ -621,7 +624,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         Collection<GridCacheVersion> committedVers,
         Collection<GridCacheVersion> rolledbackVers)
     {
-        Collection<IgniteTxEntry> entries = F.concat(false, mapping.reads(), 
mapping.writes());
+        Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), 
mapping.reads());
 
         for (IgniteTxEntry txEntry : entries) {
             while (true) {
@@ -743,8 +746,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
{
 
         if (fut == null) {
             // Future must be created before any exception can be thrown.
-            fut = optimistic() ? new GridNearOptimisticTxPrepareFuture(cctx, 
this) :
-                new GridNearPessimisticTxPrepareFuture(cctx, this);
+            if (optimistic()) {
+                fut = serializable() ?
+                    new GridNearOptimisticSerializableTxPrepareFuture(cctx, 
this) :
+                    new GridNearOptimisticTxPrepareFuture(cctx, this);
+            }
+            else
+                fut = new GridNearPessimisticTxPrepareFuture(cctx, this);
 
             if (!prepFut.compareAndSet(null, fut))
                 return prepFut.get();
@@ -871,7 +879,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsyncLocal(
         @Nullable Collection<IgniteTxEntry> reads,
         @Nullable Collection<IgniteTxEntry> writes,
-        Map<UUID, Collection<UUID>> txNodes, boolean last,
+        Map<UUID, Collection<UUID>> txNodes,
+        boolean last,
         Collection<UUID> lastBackups
     ) {
         if (state() != PREPARING) {
@@ -899,7 +908,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         try {
             // At this point all the entries passed in must be enlisted in 
transaction because this is an
             // optimistic transaction.
-            optimisticLockEntries = writes;
+            optimisticLockEntries = (serializable() && optimistic()) ? 
F.concat(false, writes, reads) : writes;
 
             userPrepare();
 
@@ -1192,12 +1201,8 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
         return plc;
     }
 
-    /**
-     * @param cacheCtx Cache context.
-     * @param keys Keys.
-     * @return Expiry policy.
-     */
-    private IgniteCacheExpiryPolicy accessPolicy(GridCacheContext cacheCtx, 
Collection<KeyCacheObject> keys) {
+    /** {@inheritDoc} */
+    @Override protected IgniteCacheExpiryPolicy accessPolicy(GridCacheContext 
cacheCtx, Collection<KeyCacheObject> keys) {
         if (accessMap != null) {
             for (Map.Entry<IgniteTxKey, IgniteCacheExpiryPolicy> e : 
accessMap.entrySet()) {
                 if (e.getKey().cacheId() == cacheCtx.cacheId() && 
keys.contains(e.getKey().key()))

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index fac7a12..45477a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -34,7 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -48,15 +48,15 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
 /**
  * Common code for tx prepare in optimistic and pessimistic modes.
  */
-public abstract class GridNearTxPrepareFutureAdapter extends 
GridCompoundIdentityFuture<IgniteInternalTx>
+public abstract class GridNearTxPrepareFutureAdapter extends 
GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx>
     implements GridCacheFuture<IgniteInternalTx> {
     /** Logger reference. */
     protected static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
 
     /** */
-    private static final IgniteReducer<IgniteInternalTx, IgniteInternalTx> 
REDUCER =
-        new IgniteReducer<IgniteInternalTx, IgniteInternalTx>() {
-            @Override public boolean collect(IgniteInternalTx e) {
+    private static final IgniteReducer<GridNearTxPrepareResponse, 
IgniteInternalTx> REDUCER =
+        new IgniteReducer<GridNearTxPrepareResponse, IgniteInternalTx>() {
+            @Override public boolean collect(GridNearTxPrepareResponse e) {
                 return true;
             }
 
@@ -94,7 +94,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends 
GridCompoundIdentit
      * @param tx Transaction.
      */
     public GridNearTxPrepareFutureAdapter(GridCacheSharedContext cctx, final 
GridNearTxLocal tx) {
-        super(cctx.kernalContext(), REDUCER);
+        super(REDUCER);
 
         assert cctx != null;
         assert tx != null;
@@ -201,6 +201,7 @@ public abstract class GridNearTxPrepareFutureAdapter 
extends GridCompoundIdentit
                 }
                 catch (GridCacheEntryRemovedException ignored) {
                     // Retry.
+                    txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index cacac13..85ed881 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -67,6 +67,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
      *
      * @param threadId Owning thread ID.
      * @param ver Lock version.
+     * @param serOrder Version for serializable transactions ordering.
+     * @param serReadVer Optional read entry version for optimistic 
serializable transaction.
      * @param timeout Timeout to acquire lock.
      * @param reenter Reentry flag.
      * @param tx Transaction flag.
@@ -77,6 +79,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
     @Nullable public GridCacheMvccCandidate addLocal(
         long threadId,
         GridCacheVersion ver,
+        @Nullable GridCacheVersion serOrder,
+        @Nullable GridCacheVersion serReadVer,
         long timeout,
         boolean reenter,
         boolean tx,
@@ -91,6 +95,11 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
         synchronized (this) {
             checkObsolete();
 
+            if (serReadVer != null) {
+                if (!checkSerializableReadVersion(serReadVer))
+                    return null;
+            }
+
             GridCacheMvcc mvcc = mvccExtras();
 
             if (mvcc == null) {
@@ -103,12 +112,16 @@ public class GridLocalCacheEntry extends 
GridCacheMapEntry {
 
             cand = mvcc.addLocal(
                 this,
+                /*nearNodeId*/null,
+                /*nearVer*/null,
                 threadId,
                 ver,
                 timeout,
+                serOrder,
                 reenter,
                 tx,
-                implicitSingle
+                implicitSingle,
+                /*dht-local*/false
             );
 
             owner = mvcc.localOwner();
@@ -191,10 +204,16 @@ public class GridLocalCacheEntry extends 
GridCacheMapEntry {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean tmLock(IgniteInternalTx tx, long timeout) throws 
GridCacheEntryRemovedException {
+    @Override public boolean tmLock(IgniteInternalTx tx,
+        long timeout,
+        @Nullable GridCacheVersion serOrder,
+        GridCacheVersion serReadVer)
+        throws GridCacheEntryRemovedException {
         GridCacheMvccCandidate cand = addLocal(
             tx.threadId(),
             tx.xidVersion(),
+            serOrder,
+            serReadVer,
             timeout,
             /*reenter*/false,
             /*tx*/true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 7018c4e..cb14b4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -225,6 +225,8 @@ public final class GridLocalLockFuture<K, V> extends 
GridFutureAdapter<Boolean>
         GridCacheMvccCandidate c = entry.addLocal(
             threadId,
             lockVer,
+            null,
+            null,
             timeout,
             !inTx(),
             inTx(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 0bf6ea2..8446665 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -495,7 +495,6 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
         @Nullable final Collection<? extends K> keys,
         final boolean forcePrimary,
         boolean skipTx,
-        @Nullable final GridCacheEntryEx entry,
         @Nullable UUID subjId,
         final String taskName,
         final boolean deserializePortable,
@@ -595,10 +594,6 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
                 catch (GridCacheEntryRemovedException ignored) {
                     // No-op, retry.
                 }
-                catch (GridCacheFilterFailedException ignored) {
-                    // No-op, skip the key.
-                    break;
-                }
                 finally {
                     if (entry != null)
                         ctx.evicts().touch(entry, 
ctx.affinity().affinityTopologyVersion());
@@ -615,7 +610,6 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
         return getAllAsync(
             keys,
             opCtx == null || !opCtx.skipStore(),
-            null,
             false,
             subjId,
             taskName,
@@ -1284,9 +1278,6 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
                 catch (GridCacheEntryRemovedException ignore) {
                     assert false : "Entry cannot become obsolete while holding 
lock.";
                 }
-                catch (GridCacheFilterFailedException ignore) {
-                    assert false : "Filter should never fail with 
failFast=false and empty filter.";
-                }
             }
 
             // Store final batch.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index c0c2284..716676f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -147,12 +147,6 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
         cctx.kernalContext().gateway().readLock();
 
         try {
-            TransactionConfiguration cfg = 
cctx.gridConfig().getTransactionConfiguration();
-
-            if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
-                throw new IllegalArgumentException("SERIALIZABLE isolation 
level is disabled (to enable change " +
-                    "'txSerializableEnabled' configuration property)");
-
             IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx);
 
             if (tx != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 7d7e3e8..1c82636 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -325,7 +325,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
         threadId = Thread.currentThread().getId();
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
     /**
@@ -374,7 +375,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         implicitSingle = false;
         loc = false;
 
-        log = U.logger(cctx.kernalContext(), logRef, this);
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
     /** {@inheritDoc} */
@@ -430,6 +432,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> optimisticLockEntries() {
+        if (serializable() && optimistic())
+            return F.concat(false, writeEntries(), readEntries());
+
         return writeEntries();
     }
 
@@ -1267,88 +1272,81 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         if (F.isEmpty(txEntry.entryProcessors()))
             return F.t(txEntry.op(), txEntry.value());
         else {
-            try {
-                boolean recordEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
+            boolean recordEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
 
-                CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
-                    txEntry.cached().innerGet(this,
-                        /*swap*/false,
-                        /*read through*/false,
-                        /*fail fast*/true,
-                        /*unmarshal*/true,
-                        /*metrics*/metrics,
-                        /*event*/recordEvt,
-                        /*temporary*/true,
-                        /*subjId*/subjId,
-                        /**closure name */recordEvt ? 
F.first(txEntry.entryProcessors()).get1() : null,
-                        resolveTaskName(),
-                        null);
+            CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
+                txEntry.cached().innerGet(this,
+                    /*swap*/false,
+                    /*read through*/false,
+                    /*fail fast*/true,
+                    /*unmarshal*/true,
+                    /*metrics*/metrics,
+                    /*event*/recordEvt,
+                    /*temporary*/true,
+                    /*subjId*/subjId,
+                    /**closure name */recordEvt ? 
F.first(txEntry.entryProcessors()).get1() : null,
+                    resolveTaskName(),
+                    null);
 
-                boolean modified = false;
+            boolean modified = false;
 
-                Object val = null;
+            Object val = null;
 
-                Object key = null;
+            Object key = null;
 
-                GridCacheVersion ver;
-
-                try {
-                    ver = txEntry.cached().version();
-                }
-                catch (GridCacheEntryRemovedException e) {
-                    assert optimistic() : txEntry;
+            GridCacheVersion ver;
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to get entry version: [msg=" + 
e.getMessage() + ']');
+            try {
+                ver = txEntry.cached().version();
+            }
+            catch (GridCacheEntryRemovedException e) {
+                assert optimistic() : txEntry;
 
-                    ver = null;
-                }
+                if (log.isDebugEnabled())
+                    log.debug("Failed to get entry version: [msg=" + 
e.getMessage() + ']');
 
-                for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : 
txEntry.entryProcessors()) {
-                    CacheInvokeEntry<Object, Object> invokeEntry = new 
CacheInvokeEntry(txEntry.context(),
-                        txEntry.key(), key, cacheVal, val, ver);
+                ver = null;
+            }
 
-                    try {
-                        EntryProcessor<Object, Object, Object> processor = 
t.get1();
+            for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : 
txEntry.entryProcessors()) {
+                CacheInvokeEntry<Object, Object> invokeEntry = new 
CacheInvokeEntry(txEntry.context(),
+                    txEntry.key(), key, cacheVal, val, ver);
 
-                        processor.process(invokeEntry, t.get2());
+                try {
+                    EntryProcessor<Object, Object, Object> processor = 
t.get1();
 
-                        val = invokeEntry.getValue();
+                    processor.process(invokeEntry, t.get2());
 
-                        key = invokeEntry.key();
-                    }
-                    catch (Exception ignore) {
-                        // No-op.
-                    }
+                    val = invokeEntry.getValue();
 
-                    modified |= invokeEntry.modified();
+                    key = invokeEntry.key();
+                }
+                catch (Exception ignore) {
+                    // No-op.
                 }
 
-                if (modified)
-                    cacheVal = 
cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
+                modified |= invokeEntry.modified();
+            }
 
-                GridCacheOperation op = modified ? (val == null ? DELETE : 
UPDATE) : NOOP;
+            if (modified)
+                cacheVal = 
cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(val));
 
-                if (op == NOOP) {
-                    ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
+            GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) 
: NOOP;
 
-                    if (expiry != null) {
-                        long ttl = CU.toTtl(expiry.getExpiryForAccess());
+            if (op == NOOP) {
+                ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry);
 
-                        txEntry.ttl(ttl);
+                if (expiry != null) {
+                    long ttl = CU.toTtl(expiry.getExpiryForAccess());
 
-                        if (ttl == CU.TTL_ZERO)
-                            op = DELETE;
-                    }
-                }
+                    txEntry.ttl(ttl);
 
-                return F.t(op, cacheVal);
+                    if (ttl == CU.TTL_ZERO)
+                        op = DELETE;
+                }
             }
-            catch (GridCacheFilterFailedException e) {
-                assert false : "Empty filter failed for innerGet: " + e;
 
-                return null;
-            }
+            return F.t(op, cacheVal);
         }
     }
 
@@ -1498,9 +1496,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
      * @param e Entry to evict if it qualifies for eviction.
      * @param primaryOnly Flag to try to evict only on primary node.
      * @return {@code True} if attempt was made to evict the entry.
-     * @throws IgniteCheckedException If failed.
      */
-    protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) 
throws IgniteCheckedException {
+    protected boolean evictNearEntry(IgniteTxEntry e, boolean primaryOnly) {
         assert e != null;
 
         if (isNearLocallyMapped(e, primaryOnly)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 2462dda..9eb2808 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -66,6 +66,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Dummy version for non-existing entry read in SERIALIZABLE transaction. 
*/
+    public static final GridCacheVersion SER_READ_EMPTY_ENTRY_VER = new 
GridCacheVersion(0, 0, 0, 0);
+
+    /** Dummy version for any existing entry read in SERIALIZABLE transaction. 
*/
+    public static final GridCacheVersion SER_READ_NOT_EMPTY_VER = new 
GridCacheVersion(0, 0, 0, 1);
+
     /** Owning transaction. */
     @GridToStringExclude
     @GridDirectTransient
@@ -175,6 +181,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
      */
     private byte flags;
 
+    /** */
+    private GridCacheVersion serReadVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -316,6 +325,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
         cp.conflictVer = conflictVer;
         cp.expiryPlc = expiryPlc;
         cp.flags = flags;
+        cp.serReadVer = serReadVer;
 
         return cp;
     }
@@ -822,6 +832,23 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
         this.entryProcessorCalcVal = entryProcessorCalcVal;
     }
 
+    /**
+     * @return Read version for serializable transaction.
+     */
+    @Nullable public GridCacheVersion serializableReadVersion() {
+        return serReadVer;
+    }
+
+    /**
+     * @param serReadVer Read version for serializable transaction.
+     */
+    public void serializableReadVersion(GridCacheVersion serReadVer) {
+        assert this.serReadVer == null;
+        assert serReadVer != null;
+
+        this.serReadVer = serReadVer;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
@@ -884,18 +911,24 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeByteArray("transformClosBytes", 
transformClosBytes))
+                if (!writer.writeMessage("serReadVer", serReadVer))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeLong("ttl", ttl))
+                if (!writer.writeByteArray("transformClosBytes", 
transformClosBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeLong("ttl", ttl))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeMessage("val", val))
                     return false;
 
@@ -979,7 +1012,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
                 reader.incrementState();
 
             case 8:
-                transformClosBytes = 
reader.readByteArray("transformClosBytes");
+                serReadVer = reader.readMessage("serReadVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -987,7 +1020,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, 
Message {
                 reader.incrementState();
 
             case 9:
-                ttl = reader.readLong("ttl");
+                transformClosBytes = 
reader.readByteArray("transformClosBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -995,6 +1028,14 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
                 reader.incrementState();
 
             case 10:
+                ttl = reader.readLong("ttl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -1014,7 +1055,7 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 530fbdf..d9786a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFutureCancelledException;
+import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
@@ -417,7 +418,8 @@ public class IgniteTxHandler {
 
             if (tx.isRollbackOnly()) {
                 try {
-                    tx.rollback();
+                    if (tx.state() != TransactionState.ROLLED_BACK && 
tx.state() != TransactionState.ROLLING_BACK)
+                        tx.rollback();
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to rollback transaction: " + tx, e);

Reply via email to