Finalization.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4b78262c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4b78262c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4b78262c

Branch: refs/heads/ignite-2926
Commit: 4b78262c678c5f12bb6952576b7cdf7c87c7c346
Parents: c8b1bb1
Author: vozerov-gridgain <[email protected]>
Authored: Fri Apr 15 14:34:19 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Fri Apr 15 14:34:19 2016 +0300

----------------------------------------------------------------------
 .../GridAbstractNearAtomicUpdateFuture.java     | 220 -----------
 .../dht/atomic/GridDhtAtomicCache.java          | 107 ++++--
 .../GridNearAtomicAbstractUpdateFuture.java     | 217 +++++++++++
 .../GridNearAtomicSingleUpdateFuture.java       | 363 +++----------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  53 +--
 5 files changed, 375 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
deleted file mode 100644
index 3088d05..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * Base for near atomic update futures.
- */
-public abstract class GridAbstractNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
-    implements GridCacheAtomicFuture<Object> {
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
-
-    /** Logger. */
-    protected static IgniteLogger log;
-
-    /** Cache context. */
-    protected final GridCacheContext cctx;
-
-    /** Cache. */
-    protected final GridDhtAtomicCache cache;
-
-    /** Write synchronization mode. */
-    protected final CacheWriteSynchronizationMode syncMode;
-
-    /** Update operation. */
-    protected final GridCacheOperation op;
-
-    /** Return value require flag. */
-    protected final boolean retval;
-
-    /** Raw return value flag. */
-    protected final boolean rawRetval;
-
-    /** Expiry policy. */
-    protected final ExpiryPolicy expiryPlc;
-
-    /** Optional filter. */
-    protected final CacheEntryPredicate[] filter;
-
-    /** Subject ID. */
-    protected final UUID subjId;
-
-    /** Task name hash. */
-    protected final int taskNameHash;
-
-    /** Skip store flag. */
-    protected final boolean skipStore;
-
-    /** Keep binary flag. */
-    protected final boolean keepBinary;
-
-    /** Wait for topology future flag. */
-    protected final boolean waitTopFut;
-
-    /** Fast map flag. */
-    protected final boolean fastMap;
-
-    /** Near cache flag. */
-    protected final boolean nearEnabled;
-
-    /** Mutex to synchronize state updates. */
-    protected final Object mux = new Object();
-
-    /** Topology locked flag. Set if atomic update is performed inside a TX or 
explicit lock. */
-    protected boolean topLocked;
-
-    /** Remap count. */
-    protected int remapCnt;
-
-    /** Current topology version. */
-    protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
-    /** */
-    protected GridCacheVersion updVer;
-
-    /** Topology version when got mapping error. */
-    protected AffinityTopologyVersion mapErrTopVer;
-
-    /** */
-    protected int resCnt;
-
-    /** Error. */
-    protected CachePartialUpdateCheckedException err;
-
-    /** Future ID. */
-    protected GridCacheVersion futVer;
-
-    /** Completion future for a particular topology version. */
-    protected GridFutureAdapter<Void> topCompleteFut;
-
-    /** Operation result. */
-    protected GridCacheReturn opRes;
-
-    /**
-     * Constructor.
-     */
-    protected GridAbstractNearAtomicUpdateFuture(
-        GridCacheContext cctx,
-        GridDhtAtomicCache cache,
-        CacheWriteSynchronizationMode syncMode,
-        GridCacheOperation op,
-        boolean retval,
-        boolean rawRetval,
-        @Nullable ExpiryPolicy expiryPlc,
-        CacheEntryPredicate[] filter,
-        UUID subjId,
-        int taskNameHash,
-        boolean skipStore,
-        boolean keepBinary,
-        boolean waitTopFut
-    ) {
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, 
GridFutureAdapter.class);
-
-        this.cctx = cctx;
-        this.cache = cache;
-        this.syncMode = syncMode;
-        this.op = op;
-        this.retval = retval;
-        this.rawRetval = rawRetval;
-        this.expiryPlc = expiryPlc;
-        this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.keepBinary = keepBinary;
-        this.waitTopFut = waitTopFut;
-
-        fastMap = F.isEmpty(filter) && op != TRANSFORM && 
cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
-            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
-        nearEnabled = CU.isNearEnabled(cctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
-    }
-
-    /**
-     * @return {@code True} future is stored by {@link 
GridCacheMvccManager#addAtomicFuture}.
-     */
-    protected boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != 
FULL_ASYNC;
-    }
-
-    /**
-     * Maps key to nodes. If filters are absent and operation is not 
TRANSFORM, then we can assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @return Collection of nodes to which key is mapped.
-     */
-    protected Collection<ClusterNode> mapKey(KeyCacheObject key, 
AffinityTopologyVersion topVer) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
-
-        // If we can send updates in parallel - do it.
-        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4f8b32c..013184b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -984,7 +984,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_PUT);
 
-        final GridNearAtomicUpdateFuture updateFut =
+        final GridNearAtomicAbstractUpdateFuture updateFut =
             createSingleUpdateFuture(key, val, proc, invokeArgs, retval, 
filter, waitTopFut);
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1016,7 +1016,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        final GridNearAtomicUpdateFuture updateFut =
+        final GridNearAtomicAbstractUpdateFuture updateFut =
             createSingleUpdateFuture(key, null, null, null, retval, filter, 
true);
 
         if (statsEnabled)
@@ -1043,7 +1043,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param waitTopFut Whether to wait for topology future.
      * @return Future.
      */
-    private GridNearAtomicUpdateFuture createSingleUpdateFuture(
+    private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture(
         K key,
         @Nullable V val,
         @Nullable EntryProcessor proc,
@@ -1055,19 +1055,19 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         GridCacheOperation op;
-        Collection vals;
+        Object val0;
 
         if (val != null) {
             op = UPDATE;
-            vals = Collections.singletonList(val);
+            val0 = val;
         }
         else if (proc != null) {
             op = TRANSFORM;
-            vals = Collections.singletonList(proc);
+            val0 = proc;
         }
         else {
             op = DELETE;
-            vals = null;
+            val0 = null;
         }
 
         GridCacheDrInfo conflictPutVal = null;
@@ -1081,37 +1081,75 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             if (op == UPDATE) {
                 conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), 
ctx.versions().next(dcId));
 
-                vals = null;
+                val0 = null;
             }
             else if (op == GridCacheOperation.TRANSFORM) {
                 conflictPutVal = new GridCacheDrInfo(proc, 
ctx.versions().next(dcId));
 
-                vals = null;
+                val0 = null;
             }
             else
                 conflictRmvVer = ctx.versions().next(dcId);
         }
 
-        return new GridNearAtomicUpdateFuture(
-            ctx,
-            this,
-            ctx.config().getWriteSynchronizationMode(),
-            op,
-            Collections.singletonList(key),
-            vals,
-            invokeArgs,
-            conflictPutVal != null ? Collections.singleton(conflictPutVal) : 
null,
-            conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : 
null,
-            retval,
-            false,
-            opCtx != null ? opCtx.expiry() : null,
-            CU.filterArray(filter),
-            ctx.subjectIdPerCall(null, opCtx),
-            ctx.kernalContext().job().currentTaskNameHash(),
-            opCtx != null && opCtx.skipStore(),
-            opCtx != null && opCtx.isKeepBinary(),
-            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
-            waitTopFut);
+        CacheEntryPredicate[] filters = CU.filterArray(filter);
+
+        if (conflictPutVal == null && conflictRmvVer == null && 
!isFastMap(filters, op)) {
+            return new GridNearAtomicSingleUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                key,
+                val0,
+                invokeArgs,
+                retval,
+                false,
+                opCtx != null ? opCtx.expiry() : null,
+                filters,
+                ctx.subjectIdPerCall(null, opCtx),
+                ctx.kernalContext().job().currentTaskNameHash(),
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut
+            );
+        }
+        else {
+            return new GridNearAtomicUpdateFuture(
+                ctx,
+                this,
+                ctx.config().getWriteSynchronizationMode(),
+                op,
+                Collections.singletonList(key),
+                val0 != null ? Collections.singletonList(val0) : null,
+                invokeArgs,
+                conflictPutVal != null ? Collections.singleton(conflictPutVal) 
: null,
+                conflictRmvVer != null ? Collections.singleton(conflictRmvVer) 
: null,
+                retval,
+                false,
+                opCtx != null ? opCtx.expiry() : null,
+                filters,
+                ctx.subjectIdPerCall(null, opCtx),
+                ctx.kernalContext().job().currentTaskNameHash(),
+                opCtx != null && opCtx.skipStore(),
+                opCtx != null && opCtx.isKeepBinary(),
+                opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+                waitTopFut);
+        }
+    }
+
+    /**
+     * Whether this is fast-map operation.
+     *
+     * @param filters Filters.
+     * @param op Operation.
+     * @return {@code True} if fast-map.
+     */
+    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation 
op) {
+        return F.isEmpty(filters) && op != TRANSFORM && 
ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
     }
 
     /**
@@ -2893,10 +2931,15 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         res.nodeId(ctx.localNodeId());
 
-        GridNearAtomicUpdateFuture fut = 
(GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridNearAtomicAbstractUpdateFuture fut =
+            
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
-        if (fut != null)
-            fut.onResult(nodeId, res, false);
+        if (fut != null) {
+            if (fut instanceof GridNearAtomicSingleUpdateFuture)
+                ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, 
false);
+            else
+                ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false);
+        }
         else
             U.warn(log, "Failed to find near update future for update response 
(will ignore) " +
                 "[nodeId=" + nodeId + ", res=" + res + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
new file mode 100644
index 0000000..0c40969
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+
+/**
+ * Base for near atomic update futures.
+ */
+public abstract class GridNearAtomicAbstractUpdateFuture extends 
GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object> {
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Cache. */
+    protected final GridDhtAtomicCache cache;
+
+    /** Write synchronization mode. */
+    protected final CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    protected final GridCacheOperation op;
+
+    /** Return value require flag. */
+    protected final boolean retval;
+
+    /** Raw return value flag. */
+    protected final boolean rawRetval;
+
+    /** Expiry policy. */
+    protected final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    protected final CacheEntryPredicate[] filter;
+
+    /** Subject ID. */
+    protected final UUID subjId;
+
+    /** Task name hash. */
+    protected final int taskNameHash;
+
+    /** Skip store flag. */
+    protected final boolean skipStore;
+
+    /** Keep binary flag. */
+    protected final boolean keepBinary;
+
+    /** Wait for topology future flag. */
+    protected final boolean waitTopFut;
+
+    /** Near cache flag. */
+    protected final boolean nearEnabled;
+
+    /** Mutex to synchronize state updates. */
+    protected final Object mux = new Object();
+
+    /** Topology locked flag. Set if atomic update is performed inside a TX or 
explicit lock. */
+    protected boolean topLocked;
+
+    /** Remap count. */
+    protected int remapCnt;
+
+    /** Current topology version. */
+    protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+    /** */
+    protected GridCacheVersion updVer;
+
+    /** Topology version when got mapping error. */
+    protected AffinityTopologyVersion mapErrTopVer;
+
+    /** */
+    protected int resCnt;
+
+    /** Error. */
+    protected CachePartialUpdateCheckedException err;
+
+    /** Future ID. */
+    protected GridCacheVersion futVer;
+
+    /** Completion future for a particular topology version. */
+    protected GridFutureAdapter<Void> topCompleteFut;
+
+    /** Operation result. */
+    protected GridCacheReturn opRes;
+
+    /**
+     * Constructor.
+     */
+    protected GridNearAtomicAbstractUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean waitTopFut
+    ) {
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, 
GridFutureAdapter.class);
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.retval = retval;
+        this.rawRetval = rawRetval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.waitTopFut = waitTopFut;
+
+        nearEnabled = CU.isNearEnabled(cctx);
+    }
+
+    /**
+     * Performs future mapping.
+     */
+    public void map() {
+        AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
+
+        if (topVer == null)
+            mapOnTopology();
+        else {
+            topLocked = true;
+
+            // Cannot remap.
+            remapCnt = 1;
+
+            map(topVer);
+        }
+    }
+
+    /**
+     * @param topVer Topology version.
+     */
+    protected abstract void map(AffinityTopologyVersion topVer);
+
+    /**
+     * Maps future on ready topology.
+     */
+    protected abstract void mapOnTopology();
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link 
GridCacheMvccManager#addAtomicFuture}.
+     */
+    protected boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != 
FULL_ASYNC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index ce5b41a..3917936 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -36,14 +36,12 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.expiry.ExpiryPolicy;
@@ -55,13 +53,13 @@ import java.util.UUID;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
-public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpdateFuture {
+// TODO: Only for !fastMap, only for !conflicts
+public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpdateFuture {
     /** Keys */
     private Object key;
 
@@ -72,12 +70,8 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
     /** Optional arguments for entry processor. */
     private Object[] invokeArgs;
 
-    /** Mappings if operations is mapped to more than one node. */
-    @GridToStringInclude
-    private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
     /** Not null is operation is mapped to single node. */
-    private GridNearAtomicUpdateRequest singleReq;
+    private GridNearAtomicUpdateRequest req;
 
     /**
      * @param cctx Cache context.
@@ -144,12 +138,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
         GridNearAtomicUpdateResponse res = null;
 
         synchronized (mux) {
-            GridNearAtomicUpdateRequest req;
-
-            if (singleReq != null)
-                req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
-            else
-                req = mappings != null ? mappings.get(nodeId) : null;
+            GridNearAtomicUpdateRequest req = this.req.nodeId().equals(nodeId) 
? this.req : null;
 
             if (req != null && req.response() == null) {
                 res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
@@ -172,50 +161,8 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
         return false;
     }
 
-    /**
-     * Performs future mapping.
-     */
-    public void map() {
-        AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
-
-        if (topVer == null)
-            mapOnTopology();
-        else {
-            topLocked = true;
-
-            // Cannot remap.
-            remapCnt = 1;
-
-            map(topVer);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        if (fastMap) {
-            GridFutureAdapter<Void> fut;
-
-            synchronized (mux) {
-                if (this.topVer != AffinityTopologyVersion.ZERO && 
this.topVer.compareTo(topVer) < 0) {
-                    if (topCompleteFut == null)
-                        topCompleteFut = new GridFutureAdapter<>();
-
-                    fut = topCompleteFut;
-                }
-                else
-                    fut = null;
-            }
-
-            if (fut != null && isDone()) {
-                fut.onDone();
-
-                return null;
-            }
-
-            return fut;
-        }
-
         return null;
     }
 
@@ -261,41 +208,20 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        boolean rcvAll;
-
         GridFutureAdapter<?> fut0 = null;
 
         synchronized (mux) {
             if (!res.futureVersion().equals(futVer))
                 return;
 
-            if (singleReq != null) {
-                if (!singleReq.nodeId().equals(nodeId))
-                    return;
-
-                req = singleReq;
-
-                singleReq = null;
+            if (!this.req.nodeId().equals(nodeId))
+                return;
 
-                rcvAll = true;
-            }
-            else {
-                req = mappings != null ? mappings.get(nodeId) : null;
+            req = this.req;
 
-                if (req != null && req.onResponse(res)) {
-                    resCnt++;
-
-                    rcvAll = mappings.size() == resCnt;
-                }
-                else
-                    return;
-            }
-
-            assert req != null && req.topologyVersion().equals(topVer) : req;
+            this.req = null;
 
             if (res.remapKeys() != null) {
-                assert !fastMap || cctx.kernalContext().clientNode();
-
                 if (mapErrTopVer == null || 
mapErrTopVer.compareTo(req.topologyVersion()) < 0)
                     mapErrTopVer = req.topologyVersion();
             }
@@ -334,51 +260,49 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
                 }
             }
 
-            if (rcvAll) {
-                if (res.remapKeys() != null) {
-                    assert mapErrTopVer != null;
+            if (res.remapKeys() != null) {
+                assert mapErrTopVer != null;
 
-                    remapTopVer = cctx.shared().exchange().topologyVersion();
-                }
-                else {
-                    if (err != null &&
-                        X.hasCause(err, 
CachePartialUpdateCheckedException.class) &&
-                        X.hasCause(err, ClusterTopologyCheckedException.class) 
&&
-                        storeFuture() &&
-                        --remapCnt > 0) {
-                        ClusterTopologyCheckedException topErr =
-                            X.cause(err, 
ClusterTopologyCheckedException.class);
+                remapTopVer = cctx.shared().exchange().topologyVersion();
+            }
+            else {
+                if (err != null &&
+                    X.hasCause(err, CachePartialUpdateCheckedException.class) 
&&
+                    X.hasCause(err, ClusterTopologyCheckedException.class) &&
+                    storeFuture() &&
+                    --remapCnt > 0) {
+                    ClusterTopologyCheckedException topErr =
+                        X.cause(err, ClusterTopologyCheckedException.class);
 
-                        if (!(topErr instanceof 
ClusterTopologyServerNotFoundException)) {
-                            CachePartialUpdateCheckedException cause =
-                                X.cause(err, 
CachePartialUpdateCheckedException.class);
+                    if (!(topErr instanceof 
ClusterTopologyServerNotFoundException)) {
+                        CachePartialUpdateCheckedException cause =
+                            X.cause(err, 
CachePartialUpdateCheckedException.class);
 
-                            assert cause != null && cause.topologyVersion() != 
null : err;
+                        assert cause != null && cause.topologyVersion() != 
null : err;
 
-                            remapTopVer =
-                                new 
AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+                        remapTopVer =
+                            new 
AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
 
-                            err = null;
+                        err = null;
 
-                            updVer = null;
-                        }
+                        updVer = null;
                     }
                 }
+            }
 
-                if (remapTopVer == null) {
-                    err0 = err;
-                    opRes0 = opRes;
-                }
-                else {
-                    fut0 = topCompleteFut;
+            if (remapTopVer == null) {
+                err0 = err;
+                opRes0 = opRes;
+            }
+            else {
+                fut0 = topCompleteFut;
 
-                    topCompleteFut = null;
+                topCompleteFut = null;
 
-                    cctx.mvcc().removeAtomicFuture(futVer);
+                cctx.mvcc().removeAtomicFuture(futVer);
 
-                    futVer = null;
-                    topVer = AffinityTopologyVersion.ZERO;
-                }
+                futVer = null;
+                topVer = AffinityTopologyVersion.ZERO;
             }
         }
 
@@ -388,18 +312,13 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
             return;
         }
 
-        if (rcvAll && nearEnabled) {
-            if (mappings != null) {
-                for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = req0.response();
+        if (nearEnabled && !nodeErr) {
+            if (res.remapKeys() != null || !req.hasPrimary())
+                return;
 
-                    assert res0 != null : req0;
+            GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
 
-                    updateNear(req0, res0);
-                }
-            }
-            else if (!nodeErr)
-                updateNear(req, res);
+            near.processNearAtomicUpdateResponse(req, res);
         }
 
         if (remapTopVer != null) {
@@ -454,31 +373,11 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
             return;
         }
 
-        if (rcvAll)
-            onDone(opRes0, err0);
+        onDone(opRes0, err0);
     }
 
-    /**
-     * Updates near cache.
-     *
-     * @param req Update request.
-     * @param res Update response.
-     */
-    private void updateNear(GridNearAtomicUpdateRequest req, 
GridNearAtomicUpdateResponse res) {
-        assert nearEnabled;
-
-        if (res.remapKeys() != null || !req.hasPrimary())
-            return;
-
-        GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
-        near.processNearAtomicUpdateResponse(req, res);
-    }
-
-    /**
-     * Maps future on ready topology.
-     */
-    private void mapOnTopology() {
+    /** {@inheritDoc} */
+    @Override protected void mapOnTopology() {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -563,50 +462,6 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
     }
 
     /**
-     * Sends messages to remote nodes and updates local cache.
-     *
-     * @param mappings Mappings to send.
-     */
-    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
-        UUID locNodeId = cctx.localNodeId();
-
-        GridNearAtomicUpdateRequest locUpdate = null;
-
-        // Send messages to remote nodes first, then run local update.
-        for (GridNearAtomicUpdateRequest req : mappings.values()) {
-            if (locNodeId.equals(req.nodeId())) {
-                assert locUpdate == null : "Cannot have more than one local 
mapping [locUpdate=" + locUpdate +
-                    ", req=" + req + ']';
-
-                locUpdate = req;
-            }
-            else {
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Sending near atomic update request 
[nodeId=" + req.nodeId() + ", req=" + req + ']');
-
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-                }
-                catch (IgniteCheckedException e) {
-                    onSendError(req, e);
-                }
-            }
-        }
-
-        if (locUpdate != null) {
-            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
-                    @Override public void apply(GridNearAtomicUpdateRequest 
req, GridNearAtomicUpdateResponse res) {
-                        onResult(res.nodeId(), res, false);
-                    }
-                });
-        }
-
-        if (syncMode == FULL_ASYNC)
-            onDone(new GridCacheReturn(cctx, true, true, null, true));
-    }
-
-    /**
      * @param req Request.
      * @param e Error.
      */
@@ -623,10 +478,8 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
         }
     }
 
-    /**
-     * @param topVer Topology version.
-     */
-    void map(AffinityTopologyVersion topVer) {
+    /** {@inheritDoc} */
+    protected void map(AffinityTopologyVersion topVer) {
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -638,7 +491,6 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
 
         Exception err = null;
         GridNearAtomicUpdateRequest singleReq0 = null;
-        Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
         GridCacheVersion futVer = cctx.versions().next(topVer);
 
@@ -659,31 +511,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
             updVer = null;
 
         try {
-            if (!fastMap)
-                singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
-            else {
-                Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = 
mapUpdate(topNodes,
-                    topVer,
-                    futVer,
-                    updVer);
-
-                if (pendingMappings.size() == 1)
-                    singleReq0 = F.firstValue(pendingMappings);
-                else {
-                    if (syncMode == PRIMARY_SYNC) {
-                        mappings0 = U.newHashMap(pendingMappings.size());
-
-                        for (GridNearAtomicUpdateRequest req : 
pendingMappings.values()) {
-                            if (req.hasPrimary())
-                                mappings0.put(req.nodeId(), req);
-                        }
-                    }
-                    else
-                        mappings0 = pendingMappings;
-
-                    assert !mappings0.isEmpty() : this;
-                }
-            }
+            singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
 
             synchronized (mux) {
                 assert this.futVer == null : this;
@@ -695,8 +523,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
 
                 resCnt = 0;
 
-                singleReq = singleReq0;
-                mappings = mappings0;
+                req = singleReq0;
             }
         }
         catch (Exception e) {
@@ -718,13 +545,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
         }
 
         // Optimize mapping for single key.
-        if (singleReq0 != null)
-            mapSingle(singleReq0.nodeId(), singleReq0);
-        else {
-            assert mappings0 != null;
-
-            doUpdate(mappings0);
-        }
+        mapSingle(singleReq0.nodeId(), singleReq0);
     }
 
     /**
@@ -752,84 +573,6 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
     }
 
     /**
-     * @param topNodes Cache nodes.
-     * @param topVer Topology version.
-     * @param futVer Future version.
-     * @param updVer Update version.
-     * @return Mapping.
-     * @throws Exception If failed.
-     */
-    private Map<UUID, GridNearAtomicUpdateRequest> 
mapUpdate(Collection<ClusterNode> topNodes,
-        AffinityTopologyVersion topVer,
-        GridCacheVersion futVer,
-        @Nullable GridCacheVersion updVer) throws Exception {
-        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = 
U.newHashMap(topNodes.size());
-
-        if (key == null)
-            throw new NullPointerException("Null key.");
-
-        Object val = this.val;
-
-        if (val == null && op != GridCacheOperation.DELETE)
-            throw new NullPointerException("Null value.");
-
-        KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
-        if (op != TRANSFORM)
-            val = cctx.toCacheObject(val);
-
-        Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer);
-
-        if (affNodes.isEmpty())
-            throw new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache " +
-                "(all partition nodes left the grid).");
-
-        int i = 0;
-
-        for (ClusterNode affNode : affNodes) {
-            if (affNode == null)
-                throw new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache " +
-                    "(all partition nodes left the grid).");
-
-            UUID nodeId = affNode.id();
-
-            GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
-            if (mapped == null) {
-                mapped = new GridNearAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
-                    futVer,
-                    fastMap,
-                    updVer,
-                    topVer,
-                    topLocked,
-                    syncMode,
-                    op,
-                    retval,
-                    expiryPlc,
-                    invokeArgs,
-                    filter,
-                    subjId,
-                    taskNameHash,
-                    skipStore,
-                    keepBinary,
-                    cctx.kernalContext().clientNode(),
-                    cctx.deploymentEnabled(),
-                    1);
-
-                pendingMappings.put(nodeId, mapped);
-            }
-
-            mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, 
CU.EXPIRE_TIME_CALCULATE, null, i == 0);
-
-            i++;
-        }
-
-        return pendingMappings;
-    }
-
-    /**
      * @param topVer Topology version.
      * @param futVer Future version.
      * @param updVer Update version.
@@ -862,7 +605,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridAbstractNearAtomicUpda
             cctx.cacheId(),
             primary.id(),
             futVer,
-            fastMap,
+            false,
             updVer,
             topVer,
             topLocked,

http://git-wip-us.apache.org/repos/asf/ignite/blob/4b78262c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index bf7b0e9..009642d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import 
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -62,7 +63,10 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 /**
  * DHT atomic cache near update future.
  */
-public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFuture {
+public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFuture {
+    /** Fast map flag. */
+    private final boolean fastMap;
+
     /** Keys */
     private Collection<?> keys;
 
@@ -151,6 +155,8 @@ public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFutu
             remapCnt = 1;
 
         this.remapCnt = remapCnt;
+
+        fastMap = cache.isFastMap(filter, op);
     }
 
     /** {@inheritDoc} */
@@ -193,24 +199,6 @@ public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFutu
         return false;
     }
 
-    /**
-     * Performs future mapping.
-     */
-    public void map() {
-        AffinityTopologyVersion topVer = 
cctx.shared().lockedTopologyVersion(null);
-
-        if (topVer == null)
-            mapOnTopology();
-        else {
-            topLocked = true;
-
-            // Cannot remap.
-            remapCnt = 1;
-
-            map(topVer, null);
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> 
completeFuture(AffinityTopologyVersion topVer) {
         // Wait fast-map near atomic update futures in CLOCK mode.
@@ -510,10 +498,8 @@ public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFutu
         near.processNearAtomicUpdateResponse(req, res);
     }
 
-    /**
-     * Maps future on ready topology.
-     */
-    private void mapOnTopology() {
+    /** {@inheritDoc} */
+    @Override protected void mapOnTopology() {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -658,6 +644,11 @@ public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFutu
         }
     }
 
+    /** {@inheritDoc} */
+    protected void map(AffinityTopologyVersion topVer) {
+        map(topVer, null);
+    }
+
     /**
      * @param topVer Topology version.
      * @param remapKeys Keys to remap.
@@ -1030,6 +1021,22 @@ public class GridNearAtomicUpdateFuture extends 
GridAbstractNearAtomicUpdateFutu
         return req;
     }
 
+    /**
+     * Maps key to nodes. If filters are absent and operation is not 
TRANSFORM, then we can assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @return Collection of nodes to which key is mapped.
+     */
+    private Collection<ClusterNode> mapKey(KeyCacheObject key, 
AffinityTopologyVersion topVer) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+
     /** {@inheritDoc} */
     public String toString() {
         synchronized (mux) {

Reply via email to