ignite-5272 For atomic update do not use GridDiscoveryManager.hasNearCache, 
instead always check readers on primary and update expected mapping on near 
node.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/851b9ad5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/851b9ad5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/851b9ad5

Branch: refs/heads/ignite-5272
Commit: 851b9ad584097bb9aa0ebc984100aec6b95d0965
Parents: 5d98cce
Author: sboikov <[email protected]>
Authored: Sat Jun 10 12:44:15 2017 +0300
Committer: sboikov <[email protected]>
Committed: Sat Jun 10 12:44:15 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheEntry.java      |  30 ++-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  76 +++++--
 .../GridDhtAtomicAbstractUpdateRequest.java     |  22 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  45 ++---
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |   6 +-
 .../GridDhtAtomicSingleUpdateRequest.java       |  14 ++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   4 +-
 ...idNearAtomicAbstractSingleUpdateRequest.java |  19 +-
 .../GridNearAtomicAbstractUpdateFuture.java     | 200 ++++++++++++++-----
 .../GridNearAtomicAbstractUpdateRequest.java    |  86 ++++++--
 .../atomic/GridNearAtomicFullUpdateRequest.java |  24 +--
 ...GridNearAtomicSingleUpdateFilterRequest.java |  23 +--
 .../GridNearAtomicSingleUpdateFuture.java       |  42 ++--
 ...GridNearAtomicSingleUpdateInvokeRequest.java |  23 +--
 .../GridNearAtomicSingleUpdateRequest.java      |  20 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  46 +++--
 .../atomic/GridNearAtomicUpdateResponse.java    |  24 +--
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 128 ++++++++++++
 18 files changed, 548 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 2e86fb0..05831b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -625,6 +625,15 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     }
 
     /**
+     * @return Readers.
+     */
+    @Nullable public ReaderId[] readersLocked() {
+        assert Thread.holdsLock(this);
+
+        return this.rdrs;
+    }
+
+    /**
      * @return Collection of readers after check.
      * @throws GridCacheEntryRemovedException If removed.
      */
@@ -739,7 +748,7 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
     /**
      * Reader ID.
      */
-    private static class ReaderId {
+    public static class ReaderId {
         /** */
         private static final ReaderId[] EMPTY_ARRAY = new ReaderId[0];
 
@@ -765,9 +774,26 @@ public class GridDhtCacheEntry extends 
GridDistributedCacheEntry {
         }
 
         /**
+         * @param readers Readers array.
+         * @param nodeId Node ID to check.
+         * @return {@code True} if node ID found in readers array.
+         */
+        public static boolean contains(@Nullable ReaderId[] readers, UUID 
nodeId) {
+            if (readers == null)
+                return false;
+
+            for (int i = 0; i < readers.length; i++) {
+                if (nodeId.equals(readers[i].nodeId))
+                    return true;
+            }
+
+            return false;
+        }
+
+        /**
          * @return Node ID.
          */
-        UUID nodeId() {
+        public UUID nodeId() {
             return nodeId;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5c7c027..fb58e71 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -92,6 +92,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
     /** Response count. */
     private volatile int resCnt;
 
+    /** */
+    private boolean addedReader;
+
     /**
      * @param cctx Cache context.
      * @param writeVer Write version.
@@ -175,7 +178,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
 
         List<ClusterNode> affNodes = affAssignment.get(entry.partition());
 
-        List<ClusterNode> dhtNodes = 
cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
+        // Client has seen that rebalancing finished, it is safe to use 
affinity mapping.
+        List<ClusterNode> dhtNodes = updateReq.affinityMapping() ?
+            affNodes : cctx.dht().topology().nodes(entry.partition(), 
affAssignment, affNodes);
 
         if (dhtNodes == null)
             dhtNodes = affNodes;
@@ -232,9 +237,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
      * @param key Key.
      * @param readers Near cache readers.
      */
-    protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> 
readers);
+    protected abstract void addNearKey(KeyCacheObject key, 
GridDhtCacheEntry.ReaderId[] readers);
 
     /**
+     * @param nearNode Near node.
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
@@ -243,27 +249,43 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
      * @param expireTime Expire time for near cache update (optional).
      */
     final void addNearWriteEntries(
-        Collection<UUID> readers,
+        ClusterNode nearNode,
+        GridDhtCacheEntry.ReaderId[] readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
+        assert readers != null;
+
         CacheWriteSynchronizationMode syncMode = 
updateReq.writeSynchronizationMode();
 
         addNearKey(entry.key(), readers);
 
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        for (UUID nodeId : readers) {
-            GridDhtAtomicAbstractUpdateRequest updateReq = 
mappings.get(nodeId);
+        for (int i = 0; i < readers.length; i++) {
+            GridDhtCacheEntry.ReaderId reader = readers[i];
+
+            if (nearNode.id().equals(reader.nodeId()))
+                continue;
+
+            GridDhtAtomicAbstractUpdateRequest updateReq = 
mappings.get(reader.nodeId());
 
             if (updateReq == null) {
-                ClusterNode node = cctx.discovery().node(nodeId);
+                ClusterNode node = cctx.discovery().node(reader.nodeId());
 
                 // Node left the grid.
-                if (node == null)
+                if (node == null) {
+                    try {
+                        entry.removeReader(reader.nodeId(), -1L);
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        assert false; // Assume hold entry lock.
+                    }
+
                     continue;
+                }
 
                 updateReq = createRequest(
                     node.id(),
@@ -275,7 +297,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
                     expireTime,
                     null);
 
-                mappings.put(nodeId, updateReq);
+                mappings.put(node.id(), updateReq);
+
+                addedReader = true;
             }
 
             updateReq.addNearWriteValue(entry.key(),
@@ -361,7 +385,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
         GridNearAtomicUpdateResponse updateRes,
         GridDhtAtomicCache.UpdateReplyClosure completionCb) {
         if (F.isEmpty(mappings)) {
-            updateRes.dhtNodes(Collections.<UUID>emptyList());
+            updateRes.mapping(Collections.<UUID>emptyList());
 
             completionCb.apply(updateReq, updateRes);
 
@@ -372,18 +396,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
 
         boolean needReplyToNear = updateReq.writeSynchronizationMode() == 
PRIMARY_SYNC ||
             !ret.emptyResult() ||
-            updateRes.nearVersion() != null ||
+            updateReq.nearCache() ||
             cctx.localNodeId().equals(nearNode.id());
 
         boolean needMapping = updateReq.fullSync() && 
(updateReq.needPrimaryResponse() || !sendAllToDht());
 
-        if (needMapping) {
+        boolean readersOnlyNodes = false;
+
+        if (!updateReq.needPrimaryResponse() && addedReader) {
+            for (GridDhtAtomicAbstractUpdateRequest dhtReq : 
mappings.values()) {
+                if (dhtReq.nearSize() > 0 && dhtReq.size() == 0) {
+                    readersOnlyNodes = true;
+
+                    break;
+                }
+            }
+        }
+
+        if (needMapping || readersOnlyNodes) {
             initMapping(updateRes);
 
             needReplyToNear = true;
         }
 
-        sendDhtRequests(nearNode, ret);
+        // If there are readers updates then nearNode should not finish before 
primary response received.
+        sendDhtRequests(nearNode, ret, !readersOnlyNodes);
 
         if (needReplyToNear)
             completionCb.apply(updateReq, updateRes);
@@ -393,24 +430,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
      * @param updateRes Response.
      */
     private void initMapping(GridNearAtomicUpdateResponse updateRes) {
-        List<UUID> dhtNodes;
+        List<UUID> mapping;
 
         if (!F.isEmpty(mappings)) {
-            dhtNodes = new ArrayList<>(mappings.size());
+            mapping = new ArrayList<>(mappings.size());
 
-            dhtNodes.addAll(mappings.keySet());
+            mapping.addAll(mappings.keySet());
         }
         else
-            dhtNodes = Collections.emptyList();
+            mapping = Collections.emptyList();
 
-        updateRes.dhtNodes(dhtNodes);
+        updateRes.mapping(mapping);
     }
 
     /**
      * @param nearNode Near node.
+     * @param sndRes {@code True} if allow to send result from DHT nodes.
      * @param ret Return value.
      */
-    private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret) {
+    private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret, 
boolean sndRes) {
         for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
             try {
                 assert !cctx.localNodeId().equals(req.nodeId()) : req;
@@ -418,7 +456,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridCacheFutureA
                 if (updateReq.fullSync()) {
                     req.nearReplyInfo(nearNode.id(), updateReq.futureId());
 
-                    if (ret.emptyResult())
+                    if (sndRes && ret.emptyResult())
                         req.hasResult(true);
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index d2dc817..a50b68c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -42,13 +42,13 @@ import org.jetbrains.annotations.Nullable;
  */
 public abstract class GridDhtAtomicAbstractUpdateRequest extends 
GridCacheIdMessage implements GridCacheDeployable {
     /** Skip store flag bit mask. */
-    private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
+    protected static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
 
     /** Keep binary flag. */
-    private static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02;
+    protected static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02;
 
     /** Near cache key flag. */
-    private static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04;
+    protected static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04;
 
     /** */
     static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
@@ -451,20 +451,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
     @Nullable public abstract Object[] invokeArguments();
 
     /**
-     * @return {@code True} if near cache update request.
-     */
-    protected final boolean near() {
-        return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK);
-    }
-
-    /**
-     * @param near Near cache update flag.
-     */
-    protected final void near(boolean near) {
-        setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK);
-    }
-
-    /**
      * Sets flag mask.
      *
      * @param flag Set or clear.
@@ -663,7 +649,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest 
extends GridCacheIdMess
             appendFlag(flags, "skipStore");
         if (keepBinary())
             appendFlag(flags, "keepBinary");
-        if (near())
+        if (isFlag(DHT_ATOMIC_NEAR_FLAG_MASK))
             appendFlag(flags, "near");
         if (hasResult())
             appendFlag(flags, "hasRes");

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 443b1b1..67e3ebc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1796,7 +1796,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                         locked = lockEntries(req, req.topologyVersion());
 
-                        boolean hasNear = ctx.discovery().cacheNearNode(node, 
name());
+                        boolean hasNear = req.nearCache();
 
                         // Assign next version for update inside entries lock.
                         GridCacheVersion ver = 
ctx.versions().next(top.topologyVersion());
@@ -2406,8 +2406,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         AffinityTopologyVersion topVer = req.topologyVersion();
 
-        boolean checkReaders = hasNear || 
ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
-
         boolean intercept = ctx.config().getInterceptor() != null;
 
         AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
@@ -2431,13 +2429,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                 Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : 
req.writeValue(i);
 
-                Collection<UUID> readers = null;
-                Collection<UUID> filteredReaders = null;
-
-                if (checkReaders) {
-                    readers = entry.readers();
-                    filteredReaders = F.view(entry.readers(), 
F.notEqualTo(nearNode.id()));
-                }
+                // Get readers before innerUpdate (reader cleared after 
remove).
+                GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked();
 
                 GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
@@ -2492,9 +2485,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             updRes.oldValue(),
                             updRes.updateCounter());
 
-                        if (!F.isEmpty(filteredReaders))
+                        if (readers != null)
                             dhtFut.addNearWriteEntries(
-                                filteredReaders,
+                                nearNode,
+                                readers,
                                 entry,
                                 updRes.newValue(),
                                 entryProcessor,
@@ -2522,13 +2516,16 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                                 res.addNearTtl(i, updRes.newTtl(), 
updRes.conflictExpireTime());
 
                             if (updRes.newValue() != null) {
-                                IgniteInternalFuture<Boolean> f = 
entry.addReader(nearNode.id(), req.messageId(), topVer);
+                                IgniteInternalFuture<Boolean> f =
+                                    entry.addReader(nearNode.id(), 
req.messageId(), topVer);
 
                                 assert f == null : f;
                             }
                         }
-                        else if (F.contains(readers, nearNode.id())) // Reader 
became primary or backup.
+                        else if (GridDhtCacheEntry.ReaderId.contains(readers, 
nearNode.id())) {
+                            // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
+                        }
                         else
                             res.addSkippedIndex(i);
                     }
@@ -2627,8 +2624,6 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         AffinityTopologyVersion topVer = req.topologyVersion();
 
-        boolean checkReaders = hasNear || 
ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
-
         CacheStorePartialUpdateException storeErr = null;
 
         try {
@@ -2688,13 +2683,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                     assert writeVal != null || op == DELETE : "null write 
value found.";
 
-                    Collection<UUID> readers = null;
-                    Collection<UUID> filteredReaders = null;
-
-                    if (checkReaders) {
-                        readers = entry.readers();
-                        filteredReaders = F.view(entry.readers(), 
F.notEqualTo(nearNode.id()));
-                    }
+                    // Get readers before innerUpdate (reader cleared after 
remove).
+                    GridDhtCacheEntry.ReaderId[] readers = 
entry.readersLocked();
 
                     GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                         ver,
@@ -2764,9 +2754,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                             updRes.oldValue(),
                             updRes.updateCounter());
 
-                        if (!F.isEmpty(filteredReaders))
+                        if (readers != null)
                             dhtFut.addNearWriteEntries(
-                                filteredReaders,
+                                nearNode,
+                                readers,
                                 entry,
                                 writeVal,
                                 entryProcessor,
@@ -2793,8 +2784,10 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                                 assert f == null : f;
                             }
                         }
-                        else if (readers.contains(nearNode.id())) // Reader 
became primary or backup.
+                        else if (GridDhtCacheEntry.ReaderId.contains(readers, 
nearNode.id())) {
+                            // Reader became primary or backup.
                             entry.removeReader(nearNode.id(), req.messageId());
+                        }
                         else
                             res.addSkippedIndex(firstEntryIdx + i);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index f053d21..595e455 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -25,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -67,9 +67,9 @@ class GridDhtAtomicSingleUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture
     }
 
     /** {@inheritDoc} */
-    @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> 
readers) {
+    @Override protected void addNearKey(KeyCacheObject key, 
GridDhtCacheEntry.ReaderId[] readers) {
         if (mappings == null)
-            mappings = U.newHashMap(readers.size());
+            mappings = U.newHashMap(readers.length);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 8931c24..0ade243 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -147,6 +147,20 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
     }
 
     /**
+     * @return {@code True} if near cache update request.
+     */
+    private boolean near() {
+        return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK);
+    }
+
+    /**
+     * @param near Near cache update flag.
+     */
+    private void near(boolean near) {
+        setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK);
+    }
+
+    /**
      * @param key Key to add.
      * @param val Value, {@code null} if should be removed.
      * @param entryProcessor Entry processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 2a84445..2124330 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
@@ -25,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -66,7 +66,7 @@ class GridDhtAtomicUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
-    @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> 
readers) {
+    @Override protected void addNearKey(KeyCacheObject key, 
GridDhtCacheEntry.ReaderId[] readers) {
         // No-op.
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 9a6909a..7022561 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -51,14 +51,11 @@ public abstract class 
GridNearAtomicAbstractSingleUpdateRequest extends GridNear
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      */
     protected GridNearAtomicAbstractSingleUpdateRequest(
@@ -66,32 +63,22 @@ public abstract class 
GridNearAtomicAbstractSingleUpdateRequest extends GridNear
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean mappingKnown,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo
     ) {
         super(cacheId,
             nodeId,
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             subjId,
             taskNameHash,
-            mappingKnown,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             addDepInfo);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 6969971..1bd8ec5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -20,9 +20,10 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -473,17 +474,41 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
     /**
      *
      */
+    static class NodeResult {
+        /** */
+        boolean rcvd;
+
+        /**
+         * @param rcvd Result received flag.
+         */
+        NodeResult(boolean rcvd) {
+            this.rcvd = rcvd;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "Result [rcvd=" + rcvd + ']';
+        }
+    }
+
+    /**
+     *
+     */
     static class PrimaryRequestState {
         /** */
         final GridNearAtomicAbstractUpdateRequest req;
 
         /** */
         @GridToStringInclude
-        Set<UUID> dhtNodes;
+        Map<UUID, NodeResult> mappedNodes;
+
+        /** */
+        @GridToStringInclude
+        private int expCnt = -1;
 
         /** */
         @GridToStringInclude
-        private Set<UUID> rcvd;
+        private int rcvdCnt;
 
         /** */
         private boolean hasRes;
@@ -501,24 +526,39 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             if (req.initMappingLocally()) {
                 if (single) {
                     if (nodes.size() > 1) {
-                        dhtNodes = U.newHashSet(nodes.size() - 1);
+                        mappedNodes = U.newHashMap(nodes.size() - 1);
 
                         for (int i = 1; i < nodes.size(); i++)
-                            dhtNodes.add(nodes.get(i).id());
+                            mappedNodes.put(nodes.get(i).id(), new 
NodeResult(false));
                     }
                     else
-                        dhtNodes = Collections.emptySet();
+                        mappedNodes = Collections.emptyMap();
                 }
                 else {
-                    dhtNodes = new HashSet<>();
+                    mappedNodes = new HashMap<>();
 
                     for (int i = 1; i < nodes.size(); i++)
-                        dhtNodes.add(nodes.get(i).id());
+                        mappedNodes.put(nodes.get(i).id(), new 
NodeResult(false));
                 }
+
+                expCnt = mappedNodes.size();
             }
         }
 
         /**
+         *
+         */
+        void resetLocalMapping() {
+            assert req.initMappingLocally() : req;
+
+            mappedNodes = null;
+
+            expCnt = -1;
+
+            req.needPrimaryResponse(true);
+        }
+
+        /**
          * @return Primary node ID.
          */
         UUID primaryId() {
@@ -532,7 +572,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             assert req.initMappingLocally();
 
             for (int i = 1; i < nodes.size(); i++)
-                dhtNodes.add(nodes.get(i).id());
+                mappedNodes.put(nodes.get(i).id(), new NodeResult(false));
+
+            expCnt = mappedNodes.size();
         }
 
         /**
@@ -547,11 +589,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
 
             boolean finished = false;
 
-            for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) {
-                UUID nodeId = it.next();
+            for (Map.Entry<UUID, NodeResult> e : mappedNodes.entrySet()) {
+                NodeResult res = e.getValue();
+
+                if (res.rcvd)
+                    continue;
+
+                UUID nodeId = e.getKey();
 
                 if (!cctx.discovery().alive(nodeId)) {
-                    it.remove();
+                    res.rcvd = true;
+
+                    rcvdCnt++;
 
                     if (finished()) {
                         finished = true;
@@ -564,7 +613,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             if (finished)
                 return DhtLeftResult.DONE;
 
-            if (dhtNodes.isEmpty())
+            if (rcvdCnt == expCnt)
                 return !req.needPrimaryResponse() ? 
DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
 
             return DhtLeftResult.NOT_DONE;
@@ -577,7 +626,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             if (req.writeSynchronizationMode() == PRIMARY_SYNC)
                 return hasRes;
 
-            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+            return (expCnt == rcvdCnt) && hasRes;
         }
 
         /**
@@ -622,14 +671,23 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
          * @return {@code True} if request processing finished.
          */
         DhtLeftResult onDhtNodeLeft(UUID nodeId) {
-            if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == 
null || finished())
+            if (req.writeSynchronizationMode() != FULL_SYNC || mappedNodes == 
null || finished())
                 return DhtLeftResult.NOT_DONE;
 
-            if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
-                if (hasRes)
-                    return DhtLeftResult.DONE;
-                else
-                    return !req.needPrimaryResponse() ? 
DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+            NodeResult res = mappedNodes.get(nodeId);
+
+            if (res != null && !res.rcvd) {
+                res.rcvd = true;
+
+                rcvdCnt++;
+
+                if (rcvdCnt == expCnt) {
+                    if (hasRes)
+                        return DhtLeftResult.DONE;
+                    else
+                        return !req.needPrimaryResponse() ?
+                            DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : 
DhtLeftResult.NOT_DONE;
+                }
             }
 
             return DhtLeftResult.NOT_DONE;
@@ -649,16 +707,38 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             if (res.hasResult())
                 hasRes = true;
 
-            if (dhtNodes == null) {
-                if (rcvd == null)
-                    rcvd = new HashSet<>();
+            if (mappedNodes == null) {
+                assert expCnt == -1 : expCnt;
+
+                mappedNodes = new HashMap<>();
+
+                mappedNodes.put(nodeId, new NodeResult(true));
 
-                rcvd.add(nodeId);
+                rcvdCnt++;
 
                 return false;
             }
 
-            return dhtNodes.remove(nodeId) && finished();
+            NodeResult nodeRes = mappedNodes.get(nodeId);
+
+            if (nodeRes != null) {
+                if (nodeRes.rcvd)
+                    return false;
+
+                nodeRes.rcvd = true;
+
+                rcvdCnt++;
+            }
+            else {
+                if (!hasRes) // Do not finish future until primary response 
received and mapping is known.
+                    expCnt = -1;
+
+                mappedNodes.put(nodeId, new NodeResult(true));
+
+                rcvdCnt++;
+            }
+
+            return finished();
         }
 
         /**
@@ -676,15 +756,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
             assert onRes;
 
             if (res.error() != null || res.remapTopologyVersion() != null) {
-                dhtNodes = Collections.emptySet(); // Mark as finished.
+                expCnt = -1; // Mark as finished.
 
                 return true;
             }
 
             assert res.returnValue() != null : res;
 
-            if (res.dhtNodes() != null)
-                initDhtNodes(res.dhtNodes(), cctx);
+            if (res.mapping() != null)
+                initMapping(res.mapping(), cctx);
 
             return finished();
         }
@@ -693,39 +773,61 @@ public abstract class GridNearAtomicAbstractUpdateFuture 
extends GridCacheFuture
          * @param nodeIds Node IDs.
          * @param cctx Context.
          */
-        private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
-            assert dhtNodes == null || req.initMappingLocally();
+        private void initMapping(List<UUID> nodeIds, GridCacheContext cctx) {
+            assert rcvdCnt <= nodeIds.size();
 
-            Set<UUID> dhtNodes0 = dhtNodes;
+            expCnt = nodeIds.size();
 
-            dhtNodes = null;
+            if (mappedNodes == null)
+                mappedNodes = U.newHashMap(nodeIds.size());
 
-            for (UUID dhtNodeId : nodeIds) {
-                if (F.contains(rcvd, dhtNodeId))
-                    continue;
+            for (int i = 0; i < nodeIds.size(); i++) {
+                UUID nodeId = nodeIds.get(i);
 
-                if (req.initMappingLocally() && !F.contains(dhtNodes0, 
dhtNodeId))
-                    continue;
+                if (!mappedNodes.containsKey(nodeId)) {
+                    NodeResult res = new NodeResult(false);
 
-                if (cctx.discovery().node(dhtNodeId) != null) {
-                    if (dhtNodes == null)
-                        dhtNodes = U.newHashSet(nodeIds.size());
+                    mappedNodes.put(nodeId, res);
 
-                    dhtNodes.add(dhtNodeId);
+                    if (cctx.discovery().node(nodeId) == null) {
+                        res.rcvd = true;
+
+                        rcvdCnt++;
+                    }
                 }
             }
-
-            if (dhtNodes == null)
-                dhtNodes = Collections.emptySet();
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(PrimaryRequestState.class, this,
-                "primary", primaryId(),
-                "needPrimaryRes", req.needPrimaryResponse(),
-                "primaryRes", req.response() != null,
-                "done", finished());
+            Set<UUID> rcvd = null;
+            Set<UUID> nonRcvd = null;
+
+            if (mappedNodes != null) {
+                for (Map.Entry<UUID, NodeResult> e : mappedNodes.entrySet()) {
+                    if (e.getValue().rcvd) {
+                        if (rcvd == null)
+                            rcvd = new HashSet<>();
+
+                        rcvd.add(e.getKey());
+                    }
+                    else {
+                        if (nonRcvd == null)
+                            nonRcvd = new HashSet<>();
+
+                        nonRcvd.add(e.getKey());
+                    }
+                }
+            }
+
+            return "Primary [id=" + primaryId() +
+                ", opRes=" + hasRes +
+                ", expCnt=" + expCnt +
+                ", rcvdCnt=" + rcvdCnt +
+                ", primaryRes=" + (req.response() != null) +
+                ", done=" + finished() +
+                ", waitFor=" + nonRcvd +
+                ", rcvd=" + rcvd + ']';
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bb47af4..62618f8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -66,6 +66,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest 
extends GridCacheIdMes
     /** Recovery value flag. */
     private static final int RECOVERY_FLAG_MASK = 0x20;
 
+    /** */
+    private static final int NEAR_CACHE_FLAG_MASK = 0x40;
+
+    /** */
+    private static final int AFFINITY_MAPPING_FLAG_MASK = 0x80;
+
     /** Target node ID. */
     @GridDirectTransient
     protected UUID nodeId;
@@ -110,15 +116,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest 
extends GridCacheIdMes
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param needPrimaryRes {@code True} if near node waits for primary 
response.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      */
     protected GridNearAtomicAbstractUpdateRequest(
@@ -126,16 +128,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest 
extends GridCacheIdMes
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean needPrimaryRes,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo
     ) {
         this.cacheId = cacheId;
@@ -146,20 +143,73 @@ public abstract class GridNearAtomicAbstractUpdateRequest 
extends GridCacheIdMes
         this.op = op;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+        this.flags = flags;
         this.addDepInfo = addDepInfo;
+    }
+
+    /**
+     * @param nearCache {@code True} if near cache enabled on originating node.
+     * @param topLocked Topology locked flag.
+     * @param retval Return value required flag.
+     * @param affMapping {@code True} if originating node detected that 
rebalancing finished and
+     *    expects that update is mapped using current affinity.
+     * @param needPrimaryRes {@code True} if near node waits for primary 
response.
+     * @param skipStore Skip write-through to a CacheStore flag.
+     * @param keepBinary Keep binary flag.
+     * @param recovery Recovery mode flag.
+     * @return Flags.
+     */
+    static byte flags(
+        boolean nearCache,
+        boolean topLocked,
+        boolean retval,
+        boolean affMapping,
+        boolean needPrimaryRes,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean recovery) {
+        byte flags = 0;
+
+        if (nearCache)
+            flags |= NEAR_CACHE_FLAG_MASK;
 
-        if (needPrimaryRes)
-            needPrimaryResponse(true);
         if (topLocked)
-            topologyLocked(true);
+            flags |= TOP_LOCKED_FLAG_MASK;
+
         if (retval)
-            returnValue(true);
+            flags |= RET_VAL_FLAG_MASK;
+
+        if (affMapping)
+            flags |= AFFINITY_MAPPING_FLAG_MASK;
+
+        if (needPrimaryRes)
+            flags |= NEED_PRIMARY_RES_FLAG_MASK;
+
         if (skipStore)
-            skipStore(true);
+            flags |= SKIP_STORE_FLAG_MASK;
+
         if (keepBinary)
-            keepBinary(true);
+            flags |= KEEP_BINARY_FLAG_MASK;
+
         if (recovery)
-            recovery(true);
+            flags |= RECOVERY_FLAG_MASK;
+
+        return flags;
+    }
+
+    /**
+     * @return {@code True} if originating node detected that rebalancing 
finished and
+     *    expects that update is mapped using current affinity.
+     */
+    boolean affinityMapping() {
+        return isFlag(AFFINITY_MAPPING_FLAG_MASK);
+    }
+
+    /**
+     * @return {@code True} if near cache is enabled on node initiated 
operation.
+     */
+    public boolean nearCache() {
+        return isFlag(NEAR_CACHE_FLAG_MASK);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index f175ab3..4b9aef0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -117,22 +117,19 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
 
     /**
      * Constructor.
-     *  @param cacheId Cache ID.
+     *
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param expiryPlc Expiry policy.
      * @param invokeArgs Optional arguments for entry processor.
      * @param filter Optional filter for atomic check.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param needPrimaryRes {@code True} if client expects primary node 
response.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      * @param maxEntryCnt Maximum entries count.
      */
@@ -141,19 +138,14 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable ExpiryPolicy expiryPlc,
         @Nullable Object[] invokeArgs,
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean needPrimaryRes,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo,
         int maxEntryCnt
     ) {
@@ -161,17 +153,13 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
             nodeId,
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             subjId,
             taskNameHash,
-            needPrimaryRes,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             addDepInfo);
+
         this.expiryPlc = expiryPlc;
         this.invokeArgs = invokeArgs;
         this.filter = filter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index f5bd889..5c66bc4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -55,20 +55,17 @@ public class GridNearAtomicSingleUpdateFilterRequest 
extends GridNearAtomicSingl
 
     /**
      * Constructor.
-     *  @param cacheId Cache ID.
+     *
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param filter Optional filter for atomic check.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param needPrimaryRes {@code True} if client expects primary node 
response.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateFilterRequest(
@@ -76,17 +73,12 @@ public class GridNearAtomicSingleUpdateFilterRequest 
extends GridNearAtomicSingl
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean needPrimaryRes,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo
     ) {
         super(
@@ -94,16 +86,11 @@ public class GridNearAtomicSingleUpdateFilterRequest 
extends GridNearAtomicSingl
             nodeId,
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             subjId,
             taskNameHash,
-            needPrimaryRes,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             addDepInfo
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6ffa373..60d94b4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -547,8 +547,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
         else
             val = 
EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), 
(EntryProcessor)val);
 
-        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
-            !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer);
 
         List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
 
@@ -558,10 +557,19 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
         ClusterNode primary = nodes.get(0);
 
-        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || 
nodes.size() == 1;
+        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || 
nodes.size() == 1 || nearEnabled;
 
         GridNearAtomicAbstractUpdateRequest req;
 
+        byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+            topLocked,
+            retval,
+            mappingKnown,
+            needPrimaryRes,
+            skipStore,
+            keepBinary,
+            recovery);
+
         if (canUseSingleRequest()) {
             if (op == TRANSFORM) {
                 req = new GridNearAtomicSingleUpdateInvokeRequest(
@@ -569,17 +577,12 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                     primary.id(),
                     futId,
                     topVer,
-                    topLocked,
                     syncMode,
                     op,
-                    retval,
                     invokeArgs,
                     subjId,
                     taskNameHash,
-                    needPrimaryRes,
-                    skipStore,
-                    keepBinary,
-                    recovery,
+                    flags,
                     cctx.deploymentEnabled());
             }
             else {
@@ -589,16 +592,11 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                         primary.id(),
                         futId,
                         topVer,
-                        topLocked,
                         syncMode,
                         op,
-                        retval,
                         subjId,
                         taskNameHash,
-                        needPrimaryRes,
-                        skipStore,
-                        keepBinary,
-                        recovery,
+                        flags,
                         cctx.deploymentEnabled());
                 }
                 else {
@@ -607,17 +605,12 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                         primary.id(),
                         futId,
                         topVer,
-                        topLocked,
                         syncMode,
                         op,
-                        retval,
                         filter,
                         subjId,
                         taskNameHash,
-                        needPrimaryRes,
-                        skipStore,
-                        keepBinary,
-                        recovery,
+                        flags,
                         cctx.deploymentEnabled());
                 }
             }
@@ -628,19 +621,14 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                 primary.id(),
                 futId,
                 topVer,
-                topLocked,
                 syncMode,
                 op,
-                retval,
                 expiryPlc,
                 invokeArgs,
                 filter,
                 subjId,
                 taskNameHash,
-                needPrimaryRes,
-                skipStore,
-                keepBinary,
-                recovery,
+                flags,
                 cctx.deploymentEnabled(),
                 1);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 850a0f9..b1fb530 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -73,20 +73,17 @@ public class GridNearAtomicSingleUpdateInvokeRequest 
extends GridNearAtomicSingl
 
     /**
      * Constructor.
-     *  @param cacheId Cache ID.
+     *
+     * @param cacheId Cache ID.
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param invokeArgs Optional arguments for entry processor.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param needPrimaryRes {@code True} if client expects primary node 
response.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateInvokeRequest(
@@ -94,17 +91,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest 
extends GridNearAtomicSingl
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable Object[] invokeArgs,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean needPrimaryRes,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo
     ) {
         super(
@@ -112,16 +104,11 @@ public class GridNearAtomicSingleUpdateInvokeRequest 
extends GridNearAtomicSingl
             nodeId,
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             subjId,
             taskNameHash,
-            needPrimaryRes,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             addDepInfo
         );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 78e0f5d..dd3a7be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -71,15 +71,11 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
      * @param nodeId Node ID.
      * @param futId Future ID.
      * @param topVer Topology version.
-     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
-     * @param retval Return value required flag.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
-     * @param needPrimaryRes {@code True} if client expects primary node 
response.
-     * @param skipStore Skip write-through to a persistent storage.
-     * @param keepBinary Keep binary flag.
+     * @param flags Flags.
      * @param addDepInfo Deployment info flag.
      */
     GridNearAtomicSingleUpdateRequest(
@@ -87,32 +83,22 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
         UUID nodeId,
         long futId,
         @NotNull AffinityTopologyVersion topVer,
-        boolean topLocked,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        boolean retval,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean needPrimaryRes,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean recovery,
+        byte flags,
         boolean addDepInfo
     ) {
         super(cacheId,
             nodeId,
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             subjId,
             taskNameHash,
-            needPrimaryRes,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             addDepInfo
         );
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 46a3c34..190ed3f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -684,11 +684,8 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             }
             else {
                 try {
-                    if (req.initMappingLocally() && 
reqState.dhtNodes.isEmpty()) {
-                        reqState.dhtNodes = null;
-
-                        req.needPrimaryResponse(true);
-                    }
+                    if (req.initMappingLocally() && 
reqState.mappedNodes.isEmpty())
+                        reqState.resetLocalMapping();
 
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
@@ -753,8 +750,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
         int size = keys.size();
 
-        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
-            !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+        boolean mappingKnown = cctx.topology().rebalanceFinished(topVer);
 
         try {
             if (size == 1) {
@@ -1001,31 +997,35 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
             ClusterNode primary = nodes.get(0);
 
-            boolean needPrimaryRes = !mappingKnown || primary.isLocal();
+            boolean needPrimaryRes = !mappingKnown || primary.isLocal() || 
nearEnabled;
 
             UUID nodeId = primary.id();
 
             PrimaryRequestState mapped = pendingMappings.get(nodeId);
 
             if (mapped == null) {
+                byte flags = 
GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+                    topLocked,
+                    retval,
+                    mappingKnown,
+                    needPrimaryRes,
+                    skipStore,
+                    keepBinary,
+                    recovery);
+
                 GridNearAtomicFullUpdateRequest req = new 
GridNearAtomicFullUpdateRequest(
                     cctx.cacheId(),
                     nodeId,
                     futId,
                     topVer,
-                    topLocked,
                     syncMode,
                     op,
-                    retval,
                     expiryPlc,
                     invokeArgs,
                     filter,
                     subjId,
                     taskNameHash,
-                    needPrimaryRes,
-                    skipStore,
-                    keepBinary,
-                    recovery,
+                    flags,
                     cctx.deploymentEnabled(),
                     keys.size());
 
@@ -1112,26 +1112,30 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
         ClusterNode primary = nodes.get(0);
 
-        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || 
nodes.size() == 1;
+        boolean needPrimaryRes = !mappingKnown || primary.isLocal() || 
nodes.size() == 1 || nearEnabled;
+
+        byte flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled,
+            topLocked,
+            retval,
+            mappingKnown,
+            needPrimaryRes,
+            skipStore,
+            keepBinary,
+            recovery);
 
         GridNearAtomicFullUpdateRequest req = new 
GridNearAtomicFullUpdateRequest(
             cctx.cacheId(),
             primary.id(),
             futId,
             topVer,
-            topLocked,
             syncMode,
             op,
-            retval,
             expiryPlc,
             invokeArgs,
             filter,
             subjId,
             taskNameHash,
-            needPrimaryRes,
-            skipStore,
-            keepBinary,
-            recovery,
+            flags,
             cctx.deploymentEnabled(),
             1);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 5ba024f..9492164 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -78,7 +78,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
     /** */
     @GridDirectCollection(UUID.class)
     @GridToStringInclude
-    private List<UUID> dhtNodes;
+    private List<UUID> mapping;
 
     /** */
     @GridDirectTransient
@@ -126,17 +126,17 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
     }
 
     /**
-     * @param dhtNodes DHT nodes.
+     * @param mapping Mapping.
      */
-    public void dhtNodes(List<UUID> dhtNodes) {
-        this.dhtNodes = dhtNodes;
+    public void mapping(List<UUID> mapping) {
+        this.mapping = mapping;
     }
 
     /**
      * @return DHT nodes.
      */
-    @Nullable public List<UUID> dhtNodes() {
-        return dhtNodes;
+    @Nullable public List<UUID> mapping() {
+        return mapping;
     }
 
     /**
@@ -406,19 +406,19 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeCollection("dhtNodes", dhtNodes, 
MessageCollectionItemType.UUID))
+                if (!writer.writeMessage("errs", errs))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("errs", errs))
+                if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeLong("futId", futId))
+                if (!writer.writeCollection("mapping", mapping, 
MessageCollectionItemType.UUID))
                     return false;
 
                 writer.incrementState();
@@ -464,7 +464,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
 
         switch (reader.state()) {
             case 3:
-                dhtNodes = reader.readCollection("dhtNodes", 
MessageCollectionItemType.UUID);
+                errs = reader.readMessage("errs");
 
                 if (!reader.isLastRead())
                     return false;
@@ -472,7 +472,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
                 reader.incrementState();
 
             case 4:
-                errs = reader.readMessage("errs");
+                futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -480,7 +480,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheIdMessage implements
                 reader.incrementState();
 
             case 5:
-                futId = reader.readLong("futId");
+                mapping = reader.readCollection("mapping", 
MessageCollectionItemType.UUID);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/851b9ad5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 749ebe8..0bfb4fb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
@@ -747,6 +748,133 @@ public class IgniteCacheAtomicProtocolTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testPutReaderUpdate1() throws Exception {
+        readerUpdateDhtFails(false, false, false);
+
+        stopAllGrids();
+
+        readerUpdateDhtFails(false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutReaderUpdate2() throws Exception {
+        readerUpdateDhtFails(true, false, false);
+
+        stopAllGrids();
+
+        readerUpdateDhtFails(true, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllReaderUpdate1() throws Exception {
+        readerUpdateDhtFails(false, false, true);
+
+        stopAllGrids();
+
+        readerUpdateDhtFails(false, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllReaderUpdate2() throws Exception {
+        readerUpdateDhtFails(true, false, true);
+
+        stopAllGrids();
+
+        readerUpdateDhtFails(true, true, true);
+    }
+
+    /**
+     * @param updateNearEnabled {@code True} if enable near cache for second 
put.
+     * @param delayReader If {@code true} delay reader response, otherwise 
delay backup response.
+     * @param putAll If {@code true} use putAll, otherwise put.
+     * @throws Exception If failed.
+     */
+    private void readerUpdateDhtFails(boolean updateNearEnabled,
+        boolean delayReader,
+        boolean putAll) throws Exception {
+        ccfg = cacheConfiguration(1, FULL_SYNC);
+
+        client = false;
+
+        startServers(2);
+
+        awaitPartitionMapExchange();
+
+        Ignite srv0 = ignite(0);
+        Ignite srv1 = ignite(1);
+
+        List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 
1);
+
+        ccfg = null;
+
+        client = true;
+
+        Ignite client1 = startGrid(2);
+
+        IgniteCache<Object, Object> cache1 = 
client1.createNearCache(TEST_CACHE, new NearCacheConfiguration<>());
+
+        Ignite client2 = startGrid(3);
+
+        IgniteCache<Object, Object> cache2 = updateNearEnabled ?
+            client2.createNearCache(TEST_CACHE, new 
NearCacheConfiguration<>()) : client2.cache(TEST_CACHE);
+
+        if (putAll) {
+            Map<Integer, Integer> map = new HashMap<>();
+
+            for (Integer key : keys)
+                map.put(key, 1);
+
+            cache1.putAll(map);
+        }
+        else
+            cache1.put(keys.get(0), 1);
+
+        if (delayReader)
+            testSpi(client1).blockMessages(GridDhtAtomicNearResponse.class, 
client2.name());
+        else
+            testSpi(srv1).blockMessages(GridDhtAtomicNearResponse.class, 
client2.name());
+
+        Map<Integer, Integer> map;
+
+        IgniteFuture<?> fut;
+
+        if (putAll) {
+            map = new HashMap<>();
+
+            for (Integer key : keys)
+                map.put(key, 1);
+
+            fut = cache2.putAllAsync(map);
+        }
+        else {
+            map = F.asMap(keys.get(0), 2);
+
+            fut = cache2.putAsync(keys.get(0), 2);
+        }
+
+        U.sleep(2000);
+
+        assertFalse(fut.isDone());
+
+        if (delayReader)
+            testSpi(client1).stopBlock();
+        else
+            testSpi(srv1).stopBlock();
+
+        fut.get();
+
+        checkData(map);
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {

Reply via email to