IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b72d18d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b72d18d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b72d18d

Branch: refs/heads/master
Commit: 9b72d18dd94ec1383653f00474c102804c02790a
Parents: c3eff6b
Author: Anton Vinogradov <a...@apache.org>
Authored: Mon Sep 19 18:07:20 2016 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Mon Sep 19 18:07:20 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  12 +
 .../communication/GridIoMessageFactory.java     |   6 +
 .../GridCacheReturnCompletableWrapper.java      | 101 +++++++++
 .../cache/GridDeferredAckMessageSender.java     | 219 ++++++++++++++++++
 .../GridDistributedTxRemoteAdapter.java         |  59 +++--
 .../distributed/dht/GridDhtTxFinishFuture.java  |  12 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  33 ++-
 .../dht/GridDhtTxFinishResponse.java            |  52 ++++-
 .../dht/GridDhtTxOnePhaseCommitAckRequest.java  | 134 +++++++++++
 .../distributed/dht/GridDhtTxPrepareFuture.java |   6 +-
 .../dht/GridDhtTxPrepareRequest.java            |  93 +++++---
 .../cache/distributed/dht/GridDhtTxRemote.java  |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          | 227 +++++--------------
 ...arOptimisticSerializableTxPrepareFuture.java |   4 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   7 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   4 +-
 .../near/GridNearTxFinishFuture.java            | 112 +++++++--
 .../cache/transactions/IgniteTxAdapter.java     |  46 +++-
 .../cache/transactions/IgniteTxHandler.java     | 163 ++++++++++---
 .../transactions/IgniteTxLocalAdapter.java      |  19 +-
 .../cache/transactions/IgniteTxManager.java     | 154 ++++++++++++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  39 +++-
 ...gniteCachePutRetryTransactionalSelfTest.java |  75 +++++-
 .../config/benchmark-client-mode.properties     |   2 +
 .../config/benchmark-tx-win.properties          |   2 +
 .../yardstick/config/benchmark-tx.properties    |   2 +
 .../yardstick/config/benchmark-win.properties   |   2 +
 modules/yardstick/config/benchmark.properties   |   2 +
 .../cache/IgniteGetAndPutBenchmark.java         |  41 ++++
 .../cache/IgniteGetAndPutTxBenchmark.java       |  70 ++++++
 .../cache/IgniteInvokeTxBenchmark.java          |  40 ++++
 31 files changed, 1405 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 7c428a6..ab6403f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -290,6 +290,18 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT = 
"IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT";
 
     /**
+     * One phase commit deferred ack request timeout.
+     */
+    public static final String 
IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT =
+        "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT";
+
+    /**
+     * One phase commit deferred ack request buffer size.
+     */
+    public static final String 
IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE =
+        "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE";
+
+    /**
      * If this property set then debug console will be opened for H2 indexing 
SPI.
      */
     public static final String IGNITE_H2_DEBUG_CONSOLE = 
"IGNITE_H2_DEBUG_CONSOLE";

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5f60215..8b8a734 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
@@ -160,6 +161,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -27:
+                msg = new GridDhtTxOnePhaseCommitAckRequest();
+
+                break;
+
             case -26:
                 msg = new TxLockList();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
new file mode 100644
index 0000000..8ceaf71
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides initialized GridCacheReturn.
+ */
+public class GridCacheReturnCompletableWrapper {
+    /** Completable wrapper upd. */
+    private static final 
AtomicReferenceFieldUpdater<GridCacheReturnCompletableWrapper, Object> 
COMPLETABLE_WRAPPER_UPD =
+        
AtomicReferenceFieldUpdater.newUpdater(GridCacheReturnCompletableWrapper.class, 
Object.class, "o");
+
+    /** */
+    private volatile Object o;
+
+    /** Node id. */
+    private final UUID nodeId;
+
+    /**
+     * @param nodeId Node id.
+     */
+    public GridCacheReturnCompletableWrapper(UUID nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    /**
+     * @return ID of node initiated tx or {@code null} if this node is local.
+     */
+    @Nullable public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Marks as initialized.
+     *
+     * @param ret Return.
+     */
+    public void initialize(GridCacheReturn ret) {
+        final Object obj = this.o;
+
+        if (obj == null) {
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, 
ret);
+
+            if (!res)
+                initialize(ret);
+        }
+        else if (obj instanceof GridFutureAdapter) {
+            ((GridFutureAdapter)obj).onDone(ret);
+
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, obj, 
ret);
+
+            assert res;
+        }
+        else
+            throw new IllegalStateException("GridCacheReturnCompletableWrapper 
can't be reinitialized");
+    }
+
+    /**
+     * Allows wait for properly initialized value.
+     */
+    public IgniteInternalFuture<GridCacheReturn> fut() {
+        final Object obj = this.o;
+
+        if (obj instanceof GridCacheReturn)
+            return new GridFinishedFuture<>((GridCacheReturn)obj);
+        else if (obj instanceof IgniteInternalFuture)
+            return (IgniteInternalFuture)obj;
+        else if (obj == null) {
+            boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, 
new GridFutureAdapter<>());
+
+            if (res)
+                return (IgniteInternalFuture)this.o;
+            else
+                return fut();
+        }
+        else
+            throw new IllegalStateException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
new file mode 100644
index 0000000..7145dc2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jsr166.ConcurrentHashMap8;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public abstract class GridDeferredAckMessageSender {
+    /** Deferred message buffers. */
+    private ConcurrentMap<UUID, DeferredAckMessageBuffer> 
deferredAckMsgBuffers = new ConcurrentHashMap8<>();
+
+    /** Timeout processor. */
+    private GridTimeoutProcessor time;
+
+    /** Closure processor. */
+    public GridClosureProcessor closure;
+
+    /**
+     * @param time Time.
+     * @param closure Closure.
+     */
+    public GridDeferredAckMessageSender(GridTimeoutProcessor time,
+        GridClosureProcessor closure) {
+        this.time = time;
+        this.closure = closure;
+    }
+
+    /**
+     *
+     */
+    public abstract int getTimeout();
+
+    /**
+     *
+     */
+    public abstract int getBufferSize();
+
+    /**
+     *
+     */
+    public abstract void finish(UUID nodeId, 
ConcurrentLinkedDeque8<GridCacheVersion> vers);
+
+    /**
+     *
+     */
+    public void stop() {
+        for (DeferredAckMessageBuffer buf : deferredAckMsgBuffers.values())
+            buf.finish0();
+    }
+
+    /**
+     * @param nodeId Node ID to send message to.
+     * @param ver Version to ack.
+     */
+    public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+        while (true) {
+            DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
+
+            if (buf == null) {
+                buf = new DeferredAckMessageBuffer(nodeId);
+
+                DeferredAckMessageBuffer old = 
deferredAckMsgBuffers.putIfAbsent(nodeId, buf);
+
+                if (old == null) {
+                    // We have successfully added buffer to map.
+                    time.addTimeoutObject(buf);
+                }
+                else
+                    buf = old;
+            }
+
+            if (!buf.add(ver))
+                // Some thread is sending filled up buffer, we can remove it.
+                deferredAckMsgBuffers.remove(nodeId, buf);
+            else
+                break;
+        }
+    }
+
+    /**
+     * Deferred message buffer.
+     */
+    private class DeferredAckMessageBuffer extends ReentrantReadWriteLock 
implements GridTimeoutObject {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Filled atomic flag. */
+        private AtomicBoolean guard = new AtomicBoolean(false);
+
+        /** Versions. */
+        private ConcurrentLinkedDeque8<GridCacheVersion> vers = new 
ConcurrentLinkedDeque8<>();
+
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Timeout ID. */
+        private final IgniteUuid timeoutId;
+
+        /** End time. */
+        private final long endTime;
+
+        /**
+         * @param nodeId Node ID to send message to.
+         */
+        private DeferredAckMessageBuffer(UUID nodeId) {
+            this.nodeId = nodeId;
+
+            timeoutId = IgniteUuid.fromUuid(nodeId);
+
+            endTime = U.currentTimeMillis() + getTimeout();
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return timeoutId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            if (guard.compareAndSet(false, true)) {
+                closure.runLocalSafe(new Runnable() {
+                    @Override public void run() {
+                        writeLock().lock();
+
+                        try {
+                            finish0();
+                        }
+                        finally {
+                            writeLock().unlock();
+                        }
+                    }
+                });
+            }
+        }
+
+        /**
+         * Adds deferred request to buffer.
+         *
+         * @param ver Version to send.
+         * @return {@code True} if request was handled, {@code false} if this 
buffer is filled and cannot be used.
+         */
+        public boolean add(GridCacheVersion ver) {
+            readLock().lock();
+
+            boolean snd = false;
+
+            try {
+                if (guard.get())
+                    return false;
+
+                vers.add(ver);
+
+                if (vers.sizex() > getBufferSize() && 
guard.compareAndSet(false, true))
+                    snd = true;
+            }
+            finally {
+                readLock().unlock();
+            }
+
+            if (snd) {
+                // Wait all threads in read lock to finish.
+                writeLock().lock();
+
+                try {
+                    finish0();
+
+                    time.removeTimeoutObject(this);
+                }
+                finally {
+                    writeLock().unlock();
+                }
+            }
+
+            return true;
+        }
+
+        /**
+         * Sends deferred notification message and removes this buffer from 
pending responses map.
+         */
+        private void finish0() {
+            finish(nodeId, vers);
+
+            deferredAckMsgBuffers.remove(nodeId, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 9d9862a..4adfa8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -36,6 +36,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import 
org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import 
org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -448,7 +450,25 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
 
                 Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap();
 
+                GridCacheReturnCompletableWrapper wrapper = null;
+
                 if (!F.isEmpty(writeMap)) {
+                    GridCacheReturn ret = null;
+
+                    if (!near() && !local() && onePhaseCommit()) {
+                        if (needReturnValue()) {
+                            ret = new GridCacheReturn(null, 
cctx.localNodeId().equals(otherNodeId()), true, null, true);
+
+                            UUID origNodeId = otherNodeId(); // Originating 
node.
+
+                            cctx.tm().addCommittedTxReturn(this,
+                                wrapper = new 
GridCacheReturnCompletableWrapper(
+                                    !cctx.localNodeId().equals(origNodeId) ? 
origNodeId : null));
+                        }
+                        else
+                            cctx.tm().addCommittedTx(this, 
this.nearXidVersion(), null);
+                    }
+
                     // Register this transaction as completed prior to 
write-phase to
                     // ensure proper lock ordering for removed entries.
                     cctx.tm().addCommittedTx(this);
@@ -457,13 +477,13 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
 
                     batchStoreCommit(writeMap().values());
 
-                    // Node that for near transactions we grab all entries.
-                    for (IgniteTxEntry txEntry : (near() ? allEntries() : 
writeEntries())) {
-                        GridCacheContext cacheCtx = txEntry.context();
+                    try {
+                        // Node that for near transactions we grab all entries.
+                        for (IgniteTxEntry txEntry : (near() ? allEntries() : 
writeEntries())) {
+                            GridCacheContext cacheCtx = txEntry.context();
 
-                        boolean replicate = cacheCtx.isDrEnabled();
+                            boolean replicate = cacheCtx.isDrEnabled();
 
-                        try {
                             while (true) {
                                 try {
                                     GridCacheEntryEx cached = txEntry.cached();
@@ -486,7 +506,7 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
                                         txEntry.cached().unswap(false);
 
                                     IgniteBiTuple<GridCacheOperation, 
CacheObject> res =
-                                        applyTransformClosures(txEntry, false);
+                                        applyTransformClosures(txEntry, false, 
ret);
 
                                     GridCacheOperation op = res.get1();
                                     CacheObject val = res.get2();
@@ -672,21 +692,26 @@ public class GridDistributedTxRemoteAdapter extends 
IgniteTxAdapter
                                 }
                             }
                         }
-                        catch (Throwable ex) {
-                            // In case of error, we still make the best effort 
to commit,
-                            // as there is no way to rollback at this point.
-                            err = new 
IgniteTxHeuristicCheckedException("Commit produced a runtime exception " +
-                                "(all transaction entries will be 
invalidated): " + CU.txString(this), ex);
+                    }
+                    catch (Throwable ex) {
+                        // In case of error, we still make the best effort to 
commit,
+                        // as there is no way to rollback at this point.
+                        err = new IgniteTxHeuristicCheckedException("Commit 
produced a runtime exception " +
+                            "(all transaction entries will be invalidated): " 
+ CU.txString(this), ex);
 
-                            U.error(log, "Commit failed.", err);
+                        U.error(log, "Commit failed.", err);
 
-                            uncommit();
+                        uncommit();
 
-                            state(UNKNOWN);
+                        state(UNKNOWN);
 
-                            if (ex instanceof Error)
-                                throw (Error)ex;
-                        }
+                        if (ex instanceof Error)
+                            throw (Error)ex;
+
+                    }
+                    finally {
+                        if (wrapper != null)
+                            wrapper.initialize(ret);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index d2e26b4..ac2ab41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -351,7 +351,9 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                 tx.size(),
                 tx.subjectId(),
                 tx.taskNameHash(),
-                tx.activeCachesDeploymentEnabled());
+                tx.activeCachesDeploymentEnabled(),
+                false,
+                false);
 
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
@@ -448,7 +450,9 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                 tx.subjectId(),
                 tx.taskNameHash(),
                 tx.activeCachesDeploymentEnabled(),
-                updCntrs);
+                updCntrs,
+                false,
+                false);
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : 
tx.xidVersion());
 
@@ -516,7 +520,9 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
                     tx.size(),
                     tx.subjectId(),
                     tx.taskNameHash(),
-                    tx.activeCachesDeploymentEnabled());
+                    tx.activeCachesDeploymentEnabled(),
+                    false,
+                    false);
 
                 req.writeVersion(tx.writeVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2d98e0d..c618a18 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -46,6 +46,9 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
     /** */
     public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
 
+    /** */
+    public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02;
+
     /** Near node ID. */
     private UUID nearNodeId;
 
@@ -141,7 +144,9 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         int txSize,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean addDepInfo
+        boolean addDepInfo,
+        boolean retVal,
+        boolean waitRemoteTxs
     ) {
         super(
             xidVer,
@@ -172,6 +177,9 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         this.sysInvalidate = sysInvalidate;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+
+        needReturnValue(retVal);
+        waitRemoteTransactions(waitRemoteTxs);
     }
 
     /**
@@ -224,11 +232,13 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
         @Nullable UUID subjId,
         int taskNameHash,
         boolean addDepInfo,
-        Collection<Long> updateIdxs
+        Collection<Long> updateIdxs,
+        boolean retVal,
+        boolean waitRemoteTxs
     ) {
         this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, 
isolation, commit, invalidate, sys, plc,
             sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, 
rolledbackVers, pendingVers, txSize,
-            subjId, taskNameHash, addDepInfo);
+            subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs);
 
         if (updateIdxs != null && !updateIdxs.isEmpty()) {
             partUpdateCnt = new GridLongList(updateIdxs.size());
@@ -339,6 +349,23 @@ public class GridDhtTxFinishRequest extends 
GridDistributedTxFinishRequest {
             flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
     }
 
+    /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param retVal Need return value.
+     */
+    public void needReturnValue(boolean retVal) {
+        if (retVal)
+            flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+        else
+            flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxFinishRequest.class, this, 
super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 78dc16f..0618172 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -19,9 +19,10 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -51,6 +52,9 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
     /** Flag indicating if this is a check-committed response. */
     private boolean checkCommitted;
 
+    /** Cache return value. */
+    private GridCacheReturn retVal;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -112,6 +116,14 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
 
         if (checkCommittedErr != null && checkCommittedErrBytes == null)
             checkCommittedErrBytes = 
ctx.marshaller().marshal(checkCommittedErr);
+
+        if (retVal != null && retVal.cacheId() != 0) {
+            GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+            assert cctx != null : retVal.cacheId();
+
+            retVal.prepareMarshal(cctx);
+        }
     }
 
     /** {@inheritDoc} */
@@ -121,6 +133,28 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
 
         if (checkCommittedErrBytes != null && checkCommittedErr == null)
             checkCommittedErr = 
ctx.marshaller().unmarshal(checkCommittedErrBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
+
+        if (retVal != null && retVal.cacheId() != 0) {
+            GridCacheContext cctx = ctx.cacheContext(retVal.cacheId());
+
+            assert cctx != null : retVal.cacheId();
+
+            retVal.finishUnmarshal(cctx, ldr);
+        }
+    }
+
+    /**
+     * @param retVal Return value.
+     */
+    public void returnValue(GridCacheReturn retVal) {
+        this.retVal = retVal;
+    }
+
+    /**
+     * @return Return value.
+     */
+    public GridCacheReturn returnValue() {
+        return retVal;
     }
 
     /** {@inheritDoc} */
@@ -161,6 +195,12 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
 
                 writer.incrementState();
 
+            case 8:
+                if (!writer.writeMessage("retVal", retVal))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -201,6 +241,14 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
 
                 reader.incrementState();
 
+            case 8:
+                retVal = reader.readMessage("retVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridDhtTxFinishResponse.class);
@@ -213,6 +261,6 @@ public class GridDhtTxFinishResponse extends 
GridDistributedTxFinishResponse {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 9;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
new file mode 100644
index 0000000..0c8ae69
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * One Phase Commit Near transaction ack request.
+ */
+public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Lock or transaction versions. */
+    @GridToStringInclude
+    @GridDirectCollection(GridCacheVersion.class)
+    protected Collection<GridCacheVersion> vers;
+
+    /**
+     * Default constructor.
+     */
+    public GridDhtTxOnePhaseCommitAckRequest() {
+        // No-op.
+    }
+
+    /**
+     *
+     * @param vers Near Tx xid Versions.
+     */
+    public GridDhtTxOnePhaseCommitAckRequest(Collection<GridCacheVersion> 
vers) {
+        this.vers = vers;
+    }
+
+    /**
+     * @return Version.
+     */
+    public Collection<GridCacheVersion> versions() {
+        return vers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtTxOnePhaseCommitAckRequest.class, this, 
super.toString());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeCollection("vers", vers, 
MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                vers = reader.readCollection("vers", 
MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return 
reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -27;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index ec73bff..1dbda69 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1245,7 +1245,8 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                         tx.onePhaseCommit(),
                         tx.subjectId(),
                         tx.taskNameHash(),
-                        tx.activeCachesDeploymentEnabled());
+                        tx.activeCachesDeploymentEnabled(),
+                        retVal);
 
                     int idx = 0;
 
@@ -1356,7 +1357,8 @@ public final class GridDhtTxPrepareFuture extends 
GridCompoundFuture<IgniteInter
                             tx.onePhaseCommit(),
                             tx.subjectId(),
                             tx.taskNameHash(),
-                            tx.activeCachesDeploymentEnabled());
+                            tx.activeCachesDeploymentEnabled(),
+                            retVal);
 
                         for (IgniteTxEntry entry : nearMapping.entries()) {
                             if (CU.writes().apply(entry)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 1cdc96f..a8f2087 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -52,6 +52,9 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01;
+
     /** Max order. */
     private UUID nearNodeId;
 
@@ -100,6 +103,9 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
     /** Preload keys. */
     private BitSet preloadKeys;
 
+    /** */
+    private byte flags;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -118,6 +124,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
      * @param txNodes Transaction nodes mapping.
      * @param nearXidVer Near transaction ID.
      * @param last {@code True} if this is last prepare request for node.
+     * @param retVal Need return value flag.
      * @param addDepInfo Deployment info flag.
      */
     public GridDhtTxPrepareRequest(
@@ -134,7 +141,8 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
         boolean onePhaseCommit,
         UUID subjId,
         int taskNameHash,
-        boolean addDepInfo) {
+        boolean addDepInfo,
+        boolean retVal) {
         super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, 
addDepInfo);
 
         assert futId != null;
@@ -149,12 +157,31 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
 
+        needReturnValue(retVal);
+
         invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : 
dhtWrites.size());
 
         nearNodeId = tx.nearNodeId();
     }
 
     /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param retVal Need return value.
+     */
+    public void needReturnValue(boolean retVal) {
+        if (retVal)
+            flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK);
+        else
+            flags &= ~NEED_RETURN_VALUE_FLAG_MASK;
+    }
+
+    /**
      * @return {@code True} if this is last prepare request for node.
      */
     public boolean last() {
@@ -348,78 +375,84 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
 
         switch (writer.state()) {
             case 23:
-                if (!writer.writeIgniteUuid("futId", futId))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeBitSet("invalidateNearEntries", 
invalidateNearEntries))
+                if (!writer.writeIgniteUuid("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("last", last))
+                if (!writer.writeBitSet("invalidateNearEntries", 
invalidateNearEntries))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeBoolean("last", last))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 28:
-                if (!writer.writeCollection("nearWrites", nearWrites, 
MessageCollectionItemType.MSG))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 29:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeCollection("nearWrites", nearWrites, 
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 30:
-                if (!writer.writeCollection("ownedKeys", ownedKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 31:
-                if (!writer.writeCollection("ownedVals", ownedVals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("ownedKeys", ownedKeys, 
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 32:
-                if (!writer.writeBitSet("preloadKeys", preloadKeys))
+                if (!writer.writeCollection("ownedVals", ownedVals, 
MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 33:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBitSet("preloadKeys", preloadKeys))
                     return false;
 
                 writer.incrementState();
 
             case 34:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 35:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 36:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -442,7 +475,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
 
         switch (reader.state()) {
             case 23:
-                futId = reader.readIgniteUuid("futId");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -450,7 +483,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 24:
-                invalidateNearEntries = 
reader.readBitSet("invalidateNearEntries");
+                futId = reader.readIgniteUuid("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -458,7 +491,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 25:
-                last = reader.readBoolean("last");
+                invalidateNearEntries = 
reader.readBitSet("invalidateNearEntries");
 
                 if (!reader.isLastRead())
                     return false;
@@ -466,7 +499,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 26:
-                miniId = reader.readIgniteUuid("miniId");
+                last = reader.readBoolean("last");
 
                 if (!reader.isLastRead())
                     return false;
@@ -474,7 +507,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 27:
-                nearNodeId = reader.readUuid("nearNodeId");
+                miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -482,7 +515,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 28:
-                nearWrites = reader.readCollection("nearWrites", 
MessageCollectionItemType.MSG);
+                nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -490,7 +523,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 29:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearWrites = reader.readCollection("nearWrites", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -498,7 +531,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 30:
-                ownedKeys = reader.readCollection("ownedKeys", 
MessageCollectionItemType.MSG);
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -506,7 +539,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 31:
-                ownedVals = reader.readCollection("ownedVals", 
MessageCollectionItemType.MSG);
+                ownedKeys = reader.readCollection("ownedKeys", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -514,7 +547,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 32:
-                preloadKeys = reader.readBitSet("preloadKeys");
+                ownedVals = reader.readCollection("ownedVals", 
MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -522,7 +555,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 33:
-                subjId = reader.readUuid("subjId");
+                preloadKeys = reader.readBitSet("preloadKeys");
 
                 if (!reader.isLastRead())
                     return false;
@@ -530,7 +563,7 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 34:
-                taskNameHash = reader.readInt("taskNameHash");
+                subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -538,6 +571,14 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
                 reader.incrementState();
 
             case 35:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 36:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -557,6 +598,6 @@ public class GridDhtTxPrepareRequest extends 
GridDistributedTxPrepareRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 36;
+        return 37;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index dc27eb1..6ad20c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -189,9 +189,9 @@ public class GridDhtTxRemote extends 
GridDistributedTxRemoteAdapter {
             commitVer,
             sys,
             plc,
-            concurrency, 
-            isolation, 
-            invalidate, 
+            concurrency,
+            isolation,
+            invalidate,
             timeout,
             txSize,
             subjId,

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1e45fa7..30a3d57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -29,9 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
@@ -60,6 +57,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
+import 
org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -82,7 +80,6 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -102,11 +99,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
@@ -144,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> 
updateReplyClos;
 
     /** Pending */
-    private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new 
ConcurrentHashMap8<>();
+    private GridDeferredAckMessageSender deferredUpdateMessageSender;
 
     /** */
     private GridNearAtomicCache<K, V> near;
@@ -240,6 +235,53 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
+        deferredUpdateMessageSender = new 
GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+            @Override public int getTimeout() {
+                return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+            }
+
+            @Override public int getBufferSize() {
+                return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
+            }
+
+            @Override public void finish(UUID nodeId, 
ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+                GridDhtAtomicDeferredUpdateResponse msg = new 
GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+                    vers, ctx.deploymentEnabled());
+
+                try {
+                    ctx.kernalContext().gateway().readLock();
+
+                    try {
+                        ctx.io().send(nodeId, msg, ctx.ioPolicy());
+
+                        if (msgLog.isDebugEnabled()) {
+                            msgLog.debug("Sent deferred DHT update response 
[futIds=" + msg.futureVersions() +
+                                ", node=" + nodeId + ']');
+                        }
+                    }
+                    finally {
+                        ctx.kernalContext().gateway().readUnlock();
+                    }
+                }
+                catch (IllegalStateException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Failed to send deferred DHT update 
response, node is stopping [" +
+                            "futIds=" + msg.futureVersions() + ", node=" + 
nodeId + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("Failed to send deferred DHT update 
response, node left [" +
+                            "futIds=" + msg.futureVersions() + ", node=" + 
nodeId + ']');
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send deferred DHT update response 
to remote node [" +
+                        "futIds=" + msg.futureVersions() + ", node=" + nodeId 
+ ']', e);
+                }
+            }
+        };
+
         CacheMetricsImpl m = new CacheMetricsImpl(ctx);
 
         if (ctx.dht().near() != null)
@@ -405,8 +447,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        for (DeferredResponseBuffer buf : pendingResponses.values())
-            buf.finish();
+        deferredUpdateMessageSender.stop();
     }
 
     /**
@@ -3208,28 +3249,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param ver Version to ack.
      */
     private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) 
{
-        while (true) {
-            DeferredResponseBuffer buf = pendingResponses.get(nodeId);
-
-            if (buf == null) {
-                buf = new DeferredResponseBuffer(nodeId);
-
-                DeferredResponseBuffer old = 
pendingResponses.putIfAbsent(nodeId, buf);
-
-                if (old == null) {
-                    // We have successfully added buffer to map.
-                    ctx.time().addTimeoutObject(buf);
-                }
-                else
-                    buf = old;
-            }
-
-            if (!buf.addResponse(ver))
-                // Some thread is sending filled up buffer, we can remove it.
-                pendingResponses.remove(nodeId, buf);
-            else
-                break;
-        }
+        deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver);
     }
 
     /**
@@ -3452,149 +3472,4 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             return Collections.emptyList();
         }
     }
-
-    /**
-     * Deferred response buffer.
-     */
-    private class DeferredResponseBuffer extends ReentrantReadWriteLock 
implements GridTimeoutObject {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Filled atomic flag. */
-        private AtomicBoolean guard = new AtomicBoolean(false);
-
-        /** Response versions. */
-        private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new 
ConcurrentLinkedDeque8<>();
-
-        /** Node ID. */
-        private final UUID nodeId;
-
-        /** Timeout ID. */
-        private final IgniteUuid timeoutId;
-
-        /** End time. */
-        private final long endTime;
-
-        /**
-         * @param nodeId Node ID to send message to.
-         */
-        private DeferredResponseBuffer(UUID nodeId) {
-            this.nodeId = nodeId;
-
-            timeoutId = IgniteUuid.fromUuid(nodeId);
-
-            endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteUuid timeoutId() {
-            return timeoutId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long endTime() {
-            return endTime;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onTimeout() {
-            if (guard.compareAndSet(false, true)) {
-                ctx.closures().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        writeLock().lock();
-
-                        try {
-                            finish();
-                        }
-                        finally {
-                            writeLock().unlock();
-                        }
-                    }
-                });
-            }
-        }
-
-        /**
-         * Adds deferred response to buffer.
-         *
-         * @param ver Version to send.
-         * @return {@code True} if response was handled, {@code false} if this 
buffer is filled and cannot be used.
-         */
-        public boolean addResponse(GridCacheVersion ver) {
-            readLock().lock();
-
-            boolean snd = false;
-
-            try {
-                if (guard.get())
-                    return false;
-
-                respVers.add(ver);
-
-                if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && 
guard.compareAndSet(false, true))
-                    snd = true;
-            }
-            finally {
-                readLock().unlock();
-            }
-
-            if (snd) {
-                // Wait all threads in read lock to finish.
-                writeLock().lock();
-
-                try {
-                    finish();
-
-                    ctx.time().removeTimeoutObject(this);
-                }
-                finally {
-                    writeLock().unlock();
-                }
-            }
-
-            return true;
-        }
-
-        /**
-         * Sends deferred notification message and removes this buffer from 
pending responses map.
-         */
-        private void finish() {
-            GridDhtAtomicDeferredUpdateResponse msg = new 
GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
-                respVers, ctx.deploymentEnabled());
-
-            try {
-                ctx.kernalContext().gateway().readLock();
-
-                try {
-                    ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("Sent deferred DHT update response 
[futIds=" + msg.futureVersions() +
-                            ", node=" + nodeId + ']');
-                    }
-                }
-                finally {
-                    ctx.kernalContext().gateway().readUnlock();
-                }
-            }
-            catch (IllegalStateException ignored) {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Failed to send deferred DHT update response, 
node is stopping [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId 
+ ']');
-                }
-            }
-            catch (ClusterTopologyCheckedException ignored) {
-                if (msgLog.isDebugEnabled()) {
-                    msgLog.debug("Failed to send deferred DHT update response, 
node left [" +
-                        "futIds=" + msg.futureVersions() + ", node=" + nodeId 
+ ']');
-                }
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send deferred DHT update response to 
remote node [" +
-                    "futIds=" + msg.futureVersions() + ", node=" + nodeId + 
']', e);
-            }
-
-            pendingResponses.remove(nodeId, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d251528..4cbfb27 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -526,7 +526,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
     ) {
         GridCacheContext cacheCtx = entry.context();
 
-        List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), 
topVer);
+        List<ClusterNode> nodes = cacheCtx.isLocal() ?
+            cacheCtx.affinity().nodes(entry.key(), topVer) :
+            
cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
 
         txMapping.addMapping(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index e17a76c..91cfbda 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -27,7 +27,6 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
@@ -599,9 +598,11 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
         GridCacheEntryEx cached0 = entry.cached();
 
         if (cached0.isDht())
-            nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer);
+            nodes = cacheCtx.topology().nodes(cached0.partition(), topVer);
         else
-            nodes = cacheCtx.affinity().nodes(entry.key(), topVer);
+            nodes = cacheCtx.isLocal() ?
+                cacheCtx.affinity().nodes(entry.key(), topVer) :
+                
cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer);
 
         txMapping.addMapping(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 34b8281..5c09398 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -193,7 +193,9 @@ public class GridNearPessimisticTxPrepareFuture extends 
GridNearTxPrepareFutureA
 
             GridCacheContext cacheCtx = txEntry.context();
 
-            List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), 
topVer);
+            List<ClusterNode> nodes = cacheCtx.isLocal() ?
+                cacheCtx.affinity().nodes(txEntry.key(), topVer) :
+                
cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer);
 
             ClusterNode primary = F.first(nodes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index bb5d482..46604c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -34,6 +34,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import 
org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
@@ -76,6 +78,9 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
     public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = 
IgniteProductVersion.fromString("1.6.0");
 
     /** */
+    public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = 
IgniteProductVersion.fromString("1.6.8");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -251,6 +256,9 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
 
                         assert f.node().id().equals(nodeId);
 
+                        if (res.returnValue() != null)
+                            tx.implicitSingleResult(res.returnValue());
+
                         f.onDhtFinishResponse(res);
                     }
                 }
@@ -432,6 +440,50 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
         catch (IgniteCheckedException e) {
             onDone(e);
         }
+        finally {
+            if (commit &&
+                tx.onePhaseCommit() &&
+                !tx.writeMap().isEmpty()) // Readonly operations require no 
ack.
+                ackBackup();
+        }
+    }
+
+    /**
+     *
+     */
+    private void ackBackup() {
+        if (mappings.empty())
+            return;
+
+        if (!tx.needReturnValue() || !tx.implicit())
+            return; // GridCacheReturn was not saved at backup.
+
+        GridDistributedTxMapping mapping = mappings.singleMapping();
+
+        if (mapping != null) {
+            UUID nodeId = mapping.node().id();
+
+            Collection<UUID> backups = tx.transactionNodes().get(nodeId);
+
+            if (!F.isEmpty(backups)) {
+                assert backups.size() == 1 : backups;
+
+                UUID backupId = F.first(backups);
+
+                ClusterNode backup = cctx.discovery().node(backupId);
+
+                // Nothing to do if backup has left the grid.
+                if (backup == null) {
+                    // No-op.
+                }
+                else if (backup.isLocal())
+                    cctx.tm().removeTxReturn(tx.xidVersion());
+                else {
+                    if 
(ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0)
+                        cctx.tm().sendDeferredAckResponse(backupId, 
tx.xidVersion());
+                }
+            }
+        }
     }
 
     /**
@@ -475,23 +527,48 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
                         readyNearMappingFromBackup(mapping);
 
                         if (committed) {
-                            if (tx.syncMode() == FULL_SYNC) {
-                                GridCacheVersion nearXidVer = 
tx.nearXidVersion();
+                            try {
+                                if (tx.needReturnValue() && tx.implicit()) {
+                                    GridCacheReturnCompletableWrapper wrapper =
+                                        
cctx.tm().getCommittedTxReturn(tx.xidVersion());
 
-                                assert nearXidVer != null : tx;
+                                    assert wrapper != null : tx.xidVersion();
 
-                                IgniteInternalFuture<?> fut = 
cctx.tm().remoteTxFinishFuture(nearXidVer);
+                                    GridCacheReturn retVal = 
wrapper.fut().get();
 
-                                fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                    @Override public void 
apply(IgniteInternalFuture<?> fut) {
-                                        mini.onDone(tx);
-                                    }
-                                });
+                                    assert retVal != null;
+
+                                    tx.implicitSingleResult(retVal);
+                                }
 
-                                return;
+                                if (tx.syncMode() == FULL_SYNC) {
+                                    GridCacheVersion nearXidVer = 
tx.nearXidVersion();
+
+                                    assert nearXidVer != null : tx;
+
+                                    IgniteInternalFuture<?> fut = 
cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+                                    fut.listen(new 
CI1<IgniteInternalFuture<?>>() {
+                                        @Override public void 
apply(IgniteInternalFuture<?> fut) {
+                                            mini.onDone(tx);
+                                        }
+                                    });
+
+                                    return;
+                                }
+
+                                mini.onDone(tx);
                             }
+                            catch (IgniteCheckedException e) {
+                                if (msgLog.isDebugEnabled()) {
+                                    msgLog.debug("Near finish fut, failed to 
finish [" +
+                                        "txId=" + tx.nearXidVersion() +
+                                        ", node=" + backup.id() +
+                                        ", err=" + e + ']');
+                                }
 
-                            mini.onDone(tx);
+                                mini.onDone(e);
+                            }
                         }
                         else {
                             ClusterTopologyCheckedException cause =
@@ -504,7 +581,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
                         }
                     }
                     else {
-                        GridDhtTxFinishRequest finishReq = 
checkCommittedRequest(mini.futureId());
+                        GridDhtTxFinishRequest finishReq = 
checkCommittedRequest(mini.futureId(), false);
 
                         // Preserve old behavior, otherwise response is not 
sent.
                         if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) 
> 0)
@@ -765,9 +842,10 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
 
     /**
      * @param miniId Mini future ID.
+     * @param waitRemoteTxs Wait for remote txs.
      * @return Finish request.
      */
-    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, 
boolean waitRemoteTxs) {
         GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
             cctx.localNodeId(),
             futureId(),
@@ -791,7 +869,9 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
             0,
             null,
             0,
-            tx.activeCachesDeploymentEnabled());
+            tx.activeCachesDeploymentEnabled(),
+            !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()),
+            waitRemoteTxs);
 
         finishReq.checkCommitted(true);
 
@@ -872,9 +952,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
 
                             add(mini);
 
-                            GridDhtTxFinishRequest req = 
checkCommittedRequest(mini.futureId());
-
-                            req.waitRemoteTransactions(true);
+                            GridDhtTxFinishRequest req = 
checkCommittedRequest(mini.futureId(), true);
 
                             for (UUID backupId : backups) {
                                 ClusterNode backup = 
cctx.discovery().node(backupId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2989e..18c3011 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -151,6 +152,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     @GridToStringExclude
     protected GridCacheSharedContext<?, ?> cctx;
 
+    /** Need return value. */
+    protected boolean needRetVal;
+
     /**
      * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number 
counter'</tt>)
      * assigned to this transaction at the end of write phase.
@@ -695,6 +699,20 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     }
 
     /**
+     * @return Flag indicating whether transaction needs return value.
+     */
+    public boolean needReturnValue() {
+        return needRetVal;
+    }
+
+    /**
+     * @param needRetVal Need return value flag.
+     */
+    public void needReturnValue(boolean needRetVal) {
+        this.needRetVal = needRetVal;
+    }
+
+    /**
      * Gets remaining allowed transaction time.
      *
      * @return Remaining transaction time. {@code 0} if timeout isn't 
specified. {@code -1} if time is out.
@@ -1285,7 +1303,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                         if (intercept || !F.isEmpty(e.entryProcessors()))
                             e.cached().unswap(false);
 
-                        IgniteBiTuple<GridCacheOperation, CacheObject> res = 
applyTransformClosures(e, false);
+                        IgniteBiTuple<GridCacheOperation, CacheObject> res = 
applyTransformClosures(e, false, null);
 
                         GridCacheContext cacheCtx = e.context();
 
@@ -1443,13 +1461,15 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
     /**
      * @param txEntry Entry to process.
      * @param metrics {@code True} if metrics should be updated.
+     * @param ret Optional return value to initialize.
      * @return Tuple containing transformation results.
      * @throws IgniteCheckedException If failed to get previous value for 
transform.
      * @throws GridCacheEntryRemovedException If entry was concurrently 
deleted.
      */
     protected IgniteBiTuple<GridCacheOperation, CacheObject> 
applyTransformClosures(
         IgniteTxEntry txEntry,
-        boolean metrics) throws GridCacheEntryRemovedException, 
IgniteCheckedException {
+        boolean metrics,
+        @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, 
IgniteCheckedException {
         GridCacheContext cacheCtx = txEntry.context();
 
         assert cacheCtx != null;
@@ -1457,8 +1477,12 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
         if (isSystemInvalidate())
             return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null);
 
-        if (F.isEmpty(txEntry.entryProcessors()))
+        if (F.isEmpty(txEntry.entryProcessors())) {
+            if (ret != null)
+                ret.value(cacheCtx, txEntry.value(), txEntry.keepBinary());
+
             return F.t(txEntry.op(), txEntry.value());
+        }
         else {
             T2<GridCacheOperation, CacheObject> calcVal = 
txEntry.entryProcessorCalculatedValue();
 
@@ -1508,17 +1532,27 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter implement
                 CacheInvokeEntry<Object, Object> invokeEntry = new 
CacheInvokeEntry<>(
                     txEntry.key(), key, cacheVal, val, ver, keepBinary, 
txEntry.cached());
 
+                Object procRes = null;
+                Exception err = null;
+
                 try {
                     EntryProcessor<Object, Object, Object> processor = 
t.get1();
 
-                    processor.process(invokeEntry, t.get2());
+                    procRes = processor.process(invokeEntry, t.get2());
 
                     val = invokeEntry.getValue();
 
                     key = invokeEntry.key();
                 }
-                catch (Exception ignore) {
-                    // No-op.
+                catch (Exception e) {
+                    err = e;
+                }
+
+                if (ret != null) {
+                    if (err != null || procRes != null)
+                        ret.addEntryProcessResult(txEntry.context(), 
txEntry.key(), null, procRes, err, keepBinary);
+                    else
+                        ret.invokeResult(true);
                 }
 
                 modified |= invokeEntry.modified();

Reply via email to