IGNITE-3075 Implement single key-value pair DHT request/response for ATOMIC 
cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51ca24f2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51ca24f2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51ca24f2

Branch: refs/heads/ignite-4242
Commit: 51ca24f2db32dff9c0034603ea3abfe5ef5cd846
Parents: 88f38ac
Author: Konstantin Dudkov <[email protected]>
Authored: Mon Nov 21 16:48:44 2016 +0300
Committer: Konstantin Dudkov <[email protected]>
Committed: Mon Nov 21 16:48:44 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  10 +-
 .../processors/cache/GridCacheIoManager.java    |  25 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  57 +-
 .../GridDhtAtomicAbstractUpdateRequest.java     | 287 ++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |  61 ++
 .../GridDhtAtomicSingleUpdateRequest.java       | 678 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  26 +
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 312 +++------
 .../GridNearAtomicAbstractUpdateRequest.java    |   5 +
 .../atomic/GridNearAtomicFullUpdateRequest.java | 108 +--
 .../GridNearAtomicSingleUpdateRequest.java      |   5 +
 .../distributed/near/GridNearAtomicCache.java   |   8 +-
 .../GridCacheAtomicMessageCountSelfTest.java    |   6 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   2 +-
 15 files changed, 1292 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index b20de68..f36191c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,12 +67,13 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -774,7 +775,12 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] - this
+            case -36:
+                msg = new GridDhtAtomicSingleUpdateRequest();
+
+                break;
+
+            // [-3..119] [124..127] [-36]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c5c1c60..924ce79 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -45,6 +45,8 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFini
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
@@ -470,8 +472,8 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             return 
((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
             return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
-        else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
-            return ((GridDhtAtomicUpdateRequest)cacheMsg).futureVersion();
+        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+            return 
((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
         else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
             return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
 
@@ -486,8 +488,8 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
     @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage 
cacheMsg) {
         if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
             return 
((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
-        else if (cacheMsg instanceof GridDhtAtomicUpdateRequest)
-            return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion();
+        else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+            return 
((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
 
         return null;
     }
@@ -791,6 +793,21 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             break;
 
+            case -36: {
+                GridDhtAtomicSingleUpdateRequest req = 
(GridDhtAtomicSingleUpdateRequest)msg;
+
+                GridDhtAtomicUpdateResponse res = new 
GridDhtAtomicUpdateResponse(
+                    ctx.cacheId(),
+                    req.futureVersion(),
+                    ctx.deploymentEnabled());
+
+                res.onError(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+            }
+
+            break;
+
             default:
                 throw new IgniteCheckedException("Failed to send response to 
node. Unsupported direct type [message="
                     + msg + "]", msg.classError());

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 3bbc348..7e4c4e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -80,7 +81,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
     private final CI2<GridNearAtomicAbstractUpdateRequest, 
GridNearAtomicUpdateResponse> completionCb;
 
     /** Update request. */
-    private final GridNearAtomicAbstractUpdateRequest updateReq;
+    protected final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
     final GridNearAtomicUpdateResponse updateRes;
@@ -90,7 +91,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
 
     /** Mappings. */
     @GridToStringInclude
-    protected Map<UUID, GridDhtAtomicUpdateRequest> mappings;
+    protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
 
     /** Continuous query closures. */
     private Collection<CI1<Boolean>> cntQryClsrs;
@@ -188,23 +189,16 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
             UUID nodeId = node.id();
 
             if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+                GridDhtAtomicAbstractUpdateRequest updateReq = 
mappings.get(nodeId);
 
                 if (updateReq == null) {
-                    updateReq = new GridDhtAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
+                    updateReq = createRequest(
+                        node,
                         futVer,
                         writeVer,
                         syncMode,
                         topVer,
-                        forceTransformBackups,
-                        this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash(),
-                        forceTransformBackups ? 
this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary(),
-                        this.updateReq.skipStore());
+                        forceTransformBackups);
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -256,7 +250,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
         for (UUID nodeId : readers) {
-            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+            GridDhtAtomicAbstractUpdateRequest updateReq = 
mappings.get(nodeId);
 
             if (updateReq == null) {
                 ClusterNode node = cctx.discovery().node(nodeId);
@@ -265,20 +259,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
                 if (node == null)
                     continue;
 
-                updateReq = new GridDhtAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
+                updateReq = createRequest(
+                    node,
                     futVer,
                     writeVer,
                     syncMode,
                     topVer,
-                    forceTransformBackups,
-                    this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : 
null,
-                    cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary(),
-                    this.updateReq.skipStore());
+                    forceTransformBackups);
 
                 mappings.put(nodeId, updateReq);
             }
@@ -336,7 +323,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
     final boolean registerResponse(UUID nodeId) {
         int resCnt0;
 
-        GridDhtAtomicUpdateRequest req = mappings != null ? 
mappings.get(nodeId) : null;
+        GridDhtAtomicAbstractUpdateRequest req = mappings != null ? 
mappings.get(nodeId) : null;
 
         if (req != null) {
             synchronized (this) {
@@ -365,7 +352,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
      */
     final void map() {
         if (!F.isEmpty(mappings)) {
-            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+            for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
                 try {
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
@@ -412,6 +399,24 @@ public abstract class GridDhtAtomicAbstractUpdateFuture 
extends GridFutureAdapte
     }
 
     /**
+     * @param node Node.
+     * @param futVer Future version.
+     * @param writeVer Update version.
+     * @param syncMode Write synchronization mode.
+     * @param topVer Topology version.
+     * @param forceTransformBackups Force transform backups flag.
+     * @return Request.
+     */
+    protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
+        ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups
+    );
+
+    /**
      * Callback for backup update response.
      *
      * @param nodeId Backup node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
new file mode 100644
index 0000000..f0bea07
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridDhtAtomicAbstractUpdateRequest extends 
GridCacheMessage implements GridCacheDeployable {
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** Node ID. */
+    @GridDirectTransient
+    protected UUID nodeId;
+
+    /** On response flag. Access should be synced on future. */
+    @GridDirectTransient
+    private boolean onRes;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    protected GridDhtAtomicAbstractUpdateRequest() {
+        // N-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     */
+    protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+        this.cacheId = cacheId;
+        this.nodeId = nodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Keep binary flag.
+     */
+    public abstract boolean keepBinary();
+
+    /**
+     * @return Skip write-through to a persistent storage.
+     */
+    public abstract boolean skipStore();
+
+    /**
+     * @return {@code True} if on response flag changed.
+     */
+    public boolean onResponse() {
+        return !onRes && (onRes = true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return addDepInfo;
+    }
+
+    /**
+     * @return Force transform backups flag.
+     */
+    public abstract boolean forceTransformBackups();
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+        return ctx.atomicMessageLogger();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        cleanup();
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
+     * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     */
+    public abstract void addWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        int partId,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
+    );
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    public abstract void addNearWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime);
+
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    protected abstract void cleanup();
+
+    /**
+     * @return Subject ID.
+     */
+    public abstract UUID subjectId();
+
+    /**
+     * @return Task name.
+     */
+    public abstract int taskNameHash();
+
+    /**
+     * @return Version assigned on primary node.
+     */
+    public abstract GridCacheVersion futureVersion();
+
+    /**
+     * @return Write version.
+     */
+    public abstract GridCacheVersion writeVersion();
+
+    /**
+     * @return Cache write synchronization mode.
+     */
+    public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+
+    /**
+     * @return Keys size.
+     */
+    public abstract int size();
+
+    /**
+     * @return Keys size.
+     */
+    public abstract int nearSize();
+
+    /**
+     * @param key Key to check.
+     * @return {@code true} if request keys contain key.
+     */
+    public abstract boolean hasKey(KeyCacheObject key);
+
+    /**
+     * @param idx Key index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject key(int idx);
+
+    /**
+     * @param idx Partition index.
+     * @return Partition id.
+     */
+    public abstract int partitionId(int idx);
+
+    /**
+     * @param updCntr Update counter.
+     * @return Update counter.
+     */
+    public abstract Long updateCounter(int updCntr);
+
+    /**
+     * @param idx Near key index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject nearKey(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject value(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject previousValue(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Entry processor.
+     */
+    @Nullable public abstract EntryProcessor<Object, Object, Object> 
entryProcessor(int idx);
+
+    /**
+     * @param idx Near key index.
+     * @return Value.
+     */
+    @Nullable public abstract CacheObject nearValue(int idx);
+
+    /**
+     * @param idx Key index.
+     * @return Transform closure.
+     */
+    @Nullable public abstract EntryProcessor<Object, Object, Object> 
nearEntryProcessor(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict version.
+     */
+    @Nullable public abstract GridCacheVersion conflictVersion(int idx);
+
+    /**
+     * @param idx Index.
+     * @return TTL.
+     */
+    public abstract long ttl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return TTL for near cache update.
+     */
+    public abstract long nearTtl(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Conflict expire time.
+     */
+    public abstract long conflictExpireTime(int idx);
+
+    /**
+     * @param idx Index.
+     * @return Expire time for near cache update.
+     */
+    public abstract long nearExpireTime(int idx);
+
+    /**
+     * @return Optional arguments for entry processor.
+     */
+    @Nullable public abstract Object[] invokeArguments();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d7eb062..2a7055d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -360,11 +360,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         ctx.io().addHandler(
             ctx.cacheId(),
-            GridDhtAtomicUpdateRequest.class,
-            new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+            GridDhtAtomicAbstractUpdateRequest.class,
+            new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() {
                 @Override public void apply(
                     UUID nodeId,
-                    GridDhtAtomicUpdateRequest req
+                    GridDhtAtomicAbstractUpdateRequest req
                 ) {
                     processDhtAtomicUpdateRequest(
                         nodeId,
@@ -3100,12 +3100,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateFuture fut =
             
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
-        if (fut != null) {
-            if (fut instanceof GridNearAtomicSingleUpdateFuture)
-                ((GridNearAtomicSingleUpdateFuture)fut).onResult(nodeId, res, 
false);
-            else
-                ((GridNearAtomicUpdateFuture)fut).onResult(nodeId, res, false);
-        }
+        if (fut != null)
+            fut.onResult(nodeId, res, false);
+
         else
             U.warn(msgLog, "Failed to find near update future for update 
response (will ignore) " +
                 "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" 
+ res + ']');
@@ -3115,7 +3112,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
      * @param nodeId Sender node ID.
      * @param req Dht atomic update request.
      */
-    private void processDhtAtomicUpdateRequest(UUID nodeId, 
GridDhtAtomicUpdateRequest req) {
+    private void processDhtAtomicUpdateRequest(UUID nodeId, 
GridDhtAtomicAbstractUpdateRequest req) {
         if (msgLog.isDebugEnabled()) {
             msgLog.debug("Received DHT atomic update request [futId=" + 
req.futureVersion() +
                 ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index f83a7b7..656caab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -20,7 +20,9 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -30,6 +32,8 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
@@ -38,6 +42,9 @@ class GridDhtAtomicSingleUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = 
IgniteProductVersion.fromString("1.7.4");
+
     /** Future keys. */
     private KeyCacheObject key;
 
@@ -87,6 +94,49 @@ class GridDhtAtomicSingleUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture
     }
 
     /** {@inheritDoc} */
+    @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+        ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups
+    ) {
+        if (canUseSingleRequest(node)) {
+            assert !forceTransformBackups;
+
+            return new GridDhtAtomicSingleUpdateRequest(
+                cctx.cacheId(),
+                node.id(),
+                futVer,
+                writeVer,
+                syncMode,
+                topVer,
+                updateReq.subjectId(),
+                updateReq.taskNameHash(),
+                cctx.deploymentEnabled(),
+                updateReq.keepBinary(),
+                updateReq.skipStore());
+        }
+        else {
+            return new GridDhtAtomicUpdateRequest(
+                cctx.cacheId(),
+                node.id(),
+                futVer,
+                writeVer,
+                syncMode,
+                topVer,
+                forceTransformBackups,
+                updateReq.subjectId(),
+                updateReq.taskNameHash(),
+                forceTransformBackups ? updateReq.invokeArguments() : null,
+                cctx.deploymentEnabled(),
+                updateReq.keepBinary(),
+                updateReq.skipStore());
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse 
updateRes) {
         if (log.isDebugEnabled())
             log.debug("Received DHT atomic update future result [nodeId=" + 
nodeId + ", updateRes=" + updateRes + ']');
@@ -114,6 +164,17 @@ class GridDhtAtomicSingleUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture
         updateRes.addFailedKey(key, err);
     }
 
+    /**
+     * @param node Target node
+     * @return {@code true} if target node supports {@link 
GridNearAtomicSingleUpdateRequest}
+     */
+    private boolean canUseSingleRequest(ClusterNode node) {
+        return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) 
>= 0 &&
+            cctx.expiry() == null &&
+            updateReq.expiry() == null &&
+            !updateReq.hasConflictData();
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
new file mode 100644
index 0000000..a03d948
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
+/**
+ *
+ */
+public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdateRequest {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Near cache key flag. */
+    private static final int NEAR_FLAG_MASK = 0x80;
+
+    /** Future version. */
+    protected GridCacheVersion futVer;
+
+    /** Write version. */
+    protected GridCacheVersion writeVer;
+
+    /** Write synchronization mode. */
+    protected CacheWriteSynchronizationMode syncMode;
+
+    /** Topology version. */
+    protected AffinityTopologyVersion topVer;
+
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** Additional flags. */
+    protected byte flags;
+
+    /** Key to update. */
+    @GridToStringInclude
+    protected KeyCacheObject key;
+
+    /** Value to update. */
+    @GridToStringInclude
+    protected CacheObject val;
+
+    /** Previous value. */
+    @GridToStringInclude
+    protected CacheObject prevVal;
+
+    /** Partition. */
+    protected long updateCntr;
+
+    /** */
+    protected int partId;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridDhtAtomicSingleUpdateRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param cacheId Cache ID.
+     * @param nodeId Node ID.
+     * @param futVer Future version.
+     * @param writeVer Write version for cache values.
+     * @param syncMode Cache write synchronization mode.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param addDepInfo Deployment info.
+     * @param keepBinary Keep binary flag.
+     * @param skipStore Skip store flag.
+     */
+    GridDhtAtomicSingleUpdateRequest(
+        int cacheId,
+        UUID nodeId,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        UUID subjId,
+        int taskNameHash,
+        boolean addDepInfo,
+        boolean keepBinary,
+        boolean skipStore
+    ) {
+        super(cacheId, nodeId);
+        this.futVer = futVer;
+        this.writeVer = writeVer;
+        this.syncMode = syncMode;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.addDepInfo = addDepInfo;
+
+        if (skipStore)
+            setFlag(true, SKIP_STORE_FLAG_MASK);
+        if (keepBinary)
+            setFlag(true, KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} adds previous value.
+     * @param partId Partition.
+     * @param prevVal Previous value.
+     * @param updateCntr Update counter.
+     */
+    @Override public void addWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        int partId,
+        @Nullable CacheObject prevVal,
+        @Nullable Long updateCntr
+    ) {
+        assert entryProcessor == null;
+        assert ttl <= 0 : ttl;
+        assert conflictExpireTime <= 0 : conflictExpireTime;
+        assert conflictVer == null : conflictVer;
+
+        near(false);
+
+        this.key = key;
+        this.partId = partId;
+        this.val = val;
+
+        if (addPrevVal)
+            this.prevVal = prevVal;
+
+        if (updateCntr != null)
+            this.updateCntr = updateCntr;
+    }
+
+    /**
+     * @param key Key to add.
+     * @param val Value, {@code null} if should be removed.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     */
+    @Override public void addNearWriteValue(KeyCacheObject key,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime) {
+        assert entryProcessor == null;
+        assert ttl <= 0 : ttl;
+
+        near(true);
+
+        this.key = key;
+        this.val = val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean forceTransformBackups() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return key != null ? near() ? 0 : 1 : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int nearSize() {
+        return key != null ? near() ? 1 : 0 : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasKey(KeyCacheObject key) {
+        return !near() && F.eq(this.key, key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? null : key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitionId(int idx) {
+        assert idx == 0 : idx;
+
+        return partId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Long updateCounter(int updCntr) {
+        assert updCntr == 0 : updCntr;
+
+        return updateCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject nearKey(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? key : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject value(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? null : val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion writeVersion() {
+        return writeVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
+        return taskNameHash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
+        return subjId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+        return syncMode;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject previousValue(int idx) {
+        assert idx == 0 : idx;
+
+        return prevVal;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject nearValue(int idx) {
+        assert idx == 0 : idx;
+
+        return near() ? val : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> 
entryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> 
nearEntryProcessor(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+        assert idx == 0 : idx;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long ttl(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long nearTtl(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.TTL_NOT_CHANGED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long nearExpireTime(int idx) {
+        assert idx == 0 : idx;
+
+        return CU.EXPIRE_TIME_CALCULATE;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return isFlag(KEEP_BINARY_FLAG_MASK);
+    }
+
+    /**
+     *
+     */
+    private boolean near() {
+        return isFlag(NEAR_FLAG_MASK);
+    }
+
+    /**
+     *
+     */
+    private void near(boolean near) {
+        setFlag(near, NEAR_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
+        super.prepareMarshal(ctx);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        prepareMarshalObject(key, cctx);
+
+        prepareMarshalObject(val, cctx);
+
+        prepareMarshalObject(prevVal, cctx);
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void finishUnmarshal(GridCacheSharedContext ctx, 
ClassLoader ldr) throws IgniteCheckedException {
+        super.finishUnmarshal(ctx, ldr);
+
+        GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+        finishUnmarshalObject(key, cctx, ldr);
+
+        finishUnmarshalObject(val, cctx, ldr);
+
+        finishUnmarshalObject(prevVal, cctx, ldr);
+
+        key.partition(partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeMessage("futVer", futVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeMessage("key", key))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeInt("partId", partId))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeMessage("prevVal", prevVal))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeUuid("subjId", subjId))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeByte("syncMode", syncMode != null ? 
(byte)syncMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeInt("taskNameHash", taskNameHash))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeLong("updateCntr", updateCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeMessage("writeVer", writeVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                futVer = reader.readMessage("futVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                partId = reader.readInt("partId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                prevVal = reader.readMessage("prevVal");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                byte syncModeOrd;
+
+                syncModeOrd = reader.readByte("syncMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                syncMode = 
CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+                reader.incrementState();
+
+            case 10:
+                taskNameHash = reader.readInt("taskNameHash");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                updateCntr = reader.readLong("updateCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                val = reader.readMessage("val");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                writeVer = reader.readMessage("writeVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class);
+    }
+
+    /**
+     * @param obj CacheObject to marshal
+     * @param ctx context
+     * @throws IgniteCheckedException if error
+     */
+    private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx) 
throws IgniteCheckedException {
+        if (obj != null)
+            obj.prepareMarshal(ctx.cacheObjectContext());
+    }
+
+    /**
+     * @param obj CacheObject un to marshal
+     * @param ctx context
+     * @param ldr class loader
+     * @throws IgniteCheckedException if error
+     */
+    private void finishUnmarshalObject(@Nullable CacheObject obj, 
GridCacheContext ctx,
+        ClassLoader ldr) throws IgniteCheckedException {
+        if (obj != null)
+            obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
+    }
+
+    /**
+     * Cleanup values not needed after message was sent.
+     */
+    @Override protected void cleanup() {
+        val = null;
+        prevVal = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -36;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 15;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, 
"super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 864aadd..dd1f1c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,7 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * DHT atomic cache backup update future.
@@ -118,6 +121,29 @@ class GridDhtAtomicUpdateFuture extends 
GridDhtAtomicAbstractUpdateFuture {
     }
 
     /** {@inheritDoc} */
+    @Override protected GridDhtAtomicAbstractUpdateRequest 
createRequest(ClusterNode node,
+        GridCacheVersion futVer,
+        GridCacheVersion writeVer,
+        CacheWriteSynchronizationMode syncMode,
+        @NotNull AffinityTopologyVersion topVer,
+        boolean forceTransformBackups) {
+        return new GridDhtAtomicUpdateRequest(
+            cctx.cacheId(),
+            node.id(),
+            futVer,
+            writeVer,
+            syncMode,
+            topVer,
+            forceTransformBackups,
+            updateReq.subjectId(),
+            updateReq.taskNameHash(),
+            forceTransformBackups ? updateReq.invokeArguments() : null,
+            cctx.deploymentEnabled(),
+            updateReq.keepBinary(),
+            updateReq.skipStore());
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 55f7560..f2fbb0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,25 +20,22 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -52,17 +49,10 @@ import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_ST
 /**
  * Lite dht cache backup update request.
  */
-public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements 
GridCacheDeployable {
+public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Message index. */
-    public static final int CACHE_MSG_IDX = nextIndexId();
-
-    /** Node ID. */
-    @GridDirectTransient
-    private UUID nodeId;
-
     /** Future version. */
     private GridCacheVersion futVer;
 
@@ -151,10 +141,6 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     /** Partition. */
     private GridLongList updateCntrs;
 
-    /** On response flag. Access should be synced on future. */
-    @GridDirectTransient
-    private boolean onRes;
-
     /** */
     @GridDirectTransient
     private List<Integer> partIds;
@@ -162,9 +148,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     /** Keep binary flag. */
     private boolean keepBinary;
 
-    /**
-     * Additional flags.
-     */
+    /** Additional flags. */
     private byte flags;
 
     /**
@@ -204,10 +188,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         boolean keepBinary,
         boolean skipStore
     ) {
-        assert invokeArgs == null || forceTransformBackups;
+        super(cacheId, nodeId);
 
-        this.cacheId = cacheId;
-        this.nodeId = nodeId;
         this.futVer = futVer;
         this.writeVer = writeVer;
         this.syncMode = syncMode;
@@ -215,12 +197,14 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         this.forceTransformBackups = forceTransformBackups;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
+
+        assert invokeArgs == null || forceTransformBackups;
+
         this.invokeArgs = invokeArgs;
         this.addDepInfo = addDepInfo;
         this.keepBinary = keepBinary;
 
-        if (skipStore)
-            flags = (byte)(flags | SKIP_STORE_FLAG_MASK);
+        setFlag(skipStore, SKIP_STORE_FLAG_MASK);
 
         keys = new ArrayList<>();
         partIds = new ArrayList<>();
@@ -233,26 +217,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
             vals = new ArrayList<>();
     }
 
-    /**
-     * @return Force transform backups flag.
-     */
-    public boolean forceTransformBackups() {
-        return forceTransformBackups;
-    }
-
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     * @param addPrevVal If {@code true} adds previous value.
-     * @param partId Partition.
-     * @param prevVal Previous value.
-     * @param updateCntr Update counter.
-     */
-    public void addWriteValue(KeyCacheObject key,
+    /** {@inheritDoc} */
+    @Override public void addWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -328,14 +294,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
             conflictExpireTimes.add(conflictExpireTime);
     }
 
-    /**
-     * @param key Key to add.
-     * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL.
-     * @param expireTime Expire time.
-     */
-    public void addNearWriteValue(KeyCacheObject key,
+    /** {@inheritDoc} */
+    @Override public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -387,183 +347,114 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public int lookupIndex() {
-        return CACHE_MSG_IDX;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
+    @Override public boolean forceTransformBackups() {
+        return forceTransformBackups;
     }
 
-    /**
-     * @return Subject ID.
-     */
-    public UUID subjectId() {
+    /** {@inheritDoc} */
+    @Override public UUID subjectId() {
         return subjId;
     }
 
-    /**
-     * @return Task name.
-     */
-    public int taskNameHash() {
+    /** {@inheritDoc} */
+    @Override public int taskNameHash() {
         return taskNameHash;
     }
 
-    /**
-     * @return Keys size.
-     */
-    public int size() {
-        return keys.size();
-    }
-
-    /**
-     * @return Keys size.
-     */
-    public int nearSize() {
-        return nearKeys != null ? nearKeys.size() : 0;
-    }
-
-    /**
-     * @return Version assigned on primary node.
-     */
-    public GridCacheVersion futureVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion futureVersion() {
         return futVer;
     }
 
-    /**
-     * @return Write version.
-     */
-    public GridCacheVersion writeVersion() {
+    /** {@inheritDoc} */
+    @Override public GridCacheVersion writeVersion() {
         return writeVer;
     }
 
-    /**
-     * @return Cache write synchronization mode.
-     */
-    public CacheWriteSynchronizationMode writeSynchronizationMode() {
+    /** {@inheritDoc} */
+    @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
         return syncMode;
     }
 
-    /**
-     * @return Topology version.
-     */
+    /** {@inheritDoc} */
     @Override public AffinityTopologyVersion topologyVersion() {
         return topVer;
     }
 
-    /**
-     * @return Keys.
-     */
-    public Collection<KeyCacheObject> keys() {
-        return keys;
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return keys.size();
     }
 
-    /**
-     * @param idx Key index.
-     * @return Key.
-     */
-    public KeyCacheObject key(int idx) {
-        return keys.get(idx);
+    /** {@inheritDoc} */
+    @Override public int nearSize() {
+        return nearKeys != null ? nearKeys.size() : 0;
     }
 
-    /**
-     * @param idx Partition index.
-     * @return Partition id.
-     */
-    public int partitionId(int idx) {
-        return partIds.get(idx);
+    /** {@inheritDoc} */
+    @Override public boolean hasKey(KeyCacheObject key) {
+        return F.contains(keys, key);
     }
 
-    /**
-     * @return Skip write-through to a persistent storage.
-     */
-    public boolean skipStore() {
-        return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK;
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        return keys.get(idx);
     }
 
-    /**
-     * @param updCntr Update counter.
-     * @return Update counter.
-     */
-    public Long updateCounter(int updCntr) {
+    /** {@inheritDoc} */
+    @Override public int partitionId(int idx) {
+        return partIds.get(idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Long updateCounter(int updCntr) {
         if (updateCntrs != null && updCntr < updateCntrs.size())
             return updateCntrs.get(updCntr);
 
         return null;
     }
 
-    /**
-     * @param idx Near key index.
-     * @return Key.
-     */
-    public KeyCacheObject nearKey(int idx) {
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject nearKey(int idx) {
         return nearKeys.get(idx);
     }
 
-    /**
-     * @return Keep binary flag.
-     */
-    public boolean keepBinary() {
-        return keepBinary;
-    }
-
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject value(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject value(int idx) {
         if (vals != null)
             return vals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject previousValue(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject previousValue(int idx) {
         if (prevVals != null)
             return prevVals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Entry processor.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int 
idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> 
entryProcessor(int idx) {
         return entryProcessors == null ? null : entryProcessors.get(idx);
     }
 
-    /**
-     * @param idx Near key index.
-     * @return Value.
-     */
-    @Nullable public CacheObject nearValue(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public CacheObject nearValue(int idx) {
         if (nearVals != null)
             return nearVals.get(idx);
 
         return null;
     }
 
-    /**
-     * @param idx Key index.
-     * @return Transform closure.
-     */
-    @Nullable public EntryProcessor<Object, Object, Object> 
nearEntryProcessor(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public EntryProcessor<Object, Object, Object> 
nearEntryProcessor(int idx) {
         return nearEntryProcessors == null ? null : 
nearEntryProcessors.get(idx);
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict version.
-     */
-    @Nullable public GridCacheVersion conflictVersion(int idx) {
+    /** {@inheritDoc} */
+    @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
         if (conflictVers != null) {
             assert idx >= 0 && idx < conflictVers.size();
 
@@ -573,11 +464,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return null;
     }
 
-    /**
-     * @param idx Index.
-     * @return TTL.
-     */
-    public long ttl(int idx) {
+    /** {@inheritDoc} */
+    @Override public long ttl(int idx) {
         if (ttls != null) {
             assert idx >= 0 && idx < ttls.size();
 
@@ -587,11 +475,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return CU.TTL_NOT_CHANGED;
     }
 
-    /**
-     * @param idx Index.
-     * @return TTL for near cache update.
-     */
-    public long nearTtl(int idx) {
+    /** {@inheritDoc} */
+    @Override public long nearTtl(int idx) {
         if (nearTtls != null) {
             assert idx >= 0 && idx < nearTtls.size();
 
@@ -601,11 +486,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return CU.TTL_NOT_CHANGED;
     }
 
-    /**
-     * @param idx Index.
-     * @return Conflict expire time.
-     */
-    public long conflictExpireTime(int idx) {
+    /** {@inheritDoc} */
+    @Override public long conflictExpireTime(int idx) {
         if (conflictExpireTimes != null) {
             assert idx >= 0 && idx < conflictExpireTimes.size();
 
@@ -615,11 +497,8 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
-    /**
-     * @param idx Index.
-     * @return Expire time for near cache update.
-     */
-    public long nearExpireTime(int idx) {
+    /** {@inheritDoc} */
+    @Override public long nearExpireTime(int idx) {
         if (nearExpireTimes != null) {
             assert idx >= 0 && idx < nearExpireTimes.size();
 
@@ -629,17 +508,18 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
-    /**
-     * @return {@code True} if on response flag changed.
-     */
-    public boolean onResponse() {
-        return !onRes && (onRes = true);
+    /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return keepBinary;
     }
 
-    /**
-     * @return Optional arguments for entry processor.
-     */
-    @Nullable public Object[] invokeArguments() {
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return isFlag(SKIP_STORE_FLAG_MASK);
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public Object[] invokeArguments() {
         return invokeArgs;
     }
 
@@ -711,16 +591,6 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public boolean addDeploymentInfo() {
-        return addDepInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
-        return ctx.atomicMessageLogger();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
@@ -1083,14 +953,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        cleanup();
-    }
-
-    /**
-     * Cleanup values not needed after message was sent.
-     */
-    private void cleanup() {
+    @Override protected void cleanup() {
         nearVals = null;
         prevVals = null;
     }
@@ -1105,6 +968,27 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
         return 26;
     }
 
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reags flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", 
super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..bae9e3a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -223,4 +223,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest 
extends GridCacheMessa
      * @return Key.
      */
     public abstract KeyCacheObject key(int idx);
+
+    /**
+     * @return {@code True} if request does not have conflict data.
+     */
+    public abstract boolean hasConflictData();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index b733d7b..c785828 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -68,24 +69,49 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     /** Future version. */
     private GridCacheVersion futVer;
 
-    /** Fast map flag. */
-    private boolean fastMap;
-
     /** Update version. Set to non-null if fastMap is {@code true}. */
     private GridCacheVersion updateVer;
 
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
-    /** Topology locked flag. Set if atomic update is performed inside TX or 
explicit lock. */
-    private boolean topLocked;
-
     /** Write synchronization mode. */
     private CacheWriteSynchronizationMode syncMode;
 
     /** Update operation. */
     private GridCacheOperation op;
 
+    /** Subject ID. */
+    protected UUID subjId;
+
+    /** Task name hash. */
+    protected int taskNameHash;
+
+    /** */
+    @GridDirectTransient
+    private GridNearAtomicUpdateResponse res;
+
+    /** Fast map flag. */
+    protected boolean fastMap;
+
+    /** Topology locked flag. Set if atomic update is performed inside TX or 
explicit lock. */
+    protected boolean topLocked;
+
+    /** Flag indicating whether request contains primary keys. */
+    protected boolean hasPrimary;
+
+    /** Skip write-through to a persistent storage. */
+    protected boolean skipStore;
+
+    /** */
+    protected boolean clientReq;
+
+    /** Keep binary flag. */
+    protected boolean keepBinary;
+
+    /** Return value flag. */
+    protected boolean retval;
+
     /** Keys to update. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
@@ -107,13 +133,6 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     @GridDirectCollection(byte[].class)
     private List<byte[]> entryProcessorsBytes;
 
-    /** Optional arguments for entry processor. */
-    @GridDirectTransient
-    private Object[] invokeArgs;
-
-    /** Entry processor arguments bytes. */
-    private byte[][] invokeArgsBytes;
-
     /** Conflict versions. */
     @GridDirectCollection(GridCacheVersion.class)
     private List<GridCacheVersion> conflictVers;
@@ -124,8 +143,12 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     /** Conflict expire times. */
     private GridLongList conflictExpireTimes;
 
-    /** Return value flag. */
-    private boolean retval;
+    /** Optional arguments for entry processor. */
+    @GridDirectTransient
+    private Object[] invokeArgs;
+
+    /** Entry processor arguments bytes. */
+    private byte[][] invokeArgsBytes;
 
     /** Expiry policy. */
     @GridDirectTransient
@@ -137,28 +160,6 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     /** Filter. */
     private CacheEntryPredicate[] filter;
 
-    /** Flag indicating whether request contains primary keys. */
-    private boolean hasPrimary;
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name hash. */
-    private int taskNameHash;
-
-    /** Skip write-through to a persistent storage. */
-    private boolean skipStore;
-
-    /** */
-    private boolean clientReq;
-
-    /** Keep binary flag. */
-    private boolean keepBinary;
-
-    /** */
-    @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
-
     /** Maximum possible size of inner collections. */
     @GridDirectTransient
     private int initSize;
@@ -523,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public CacheEntryPredicate[] filter() {
+    @Override @Nullable public CacheEntryPredicate[] filter() {
         return filter;
     }
 
@@ -533,11 +534,19 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public boolean hasConflictData() {
+        return F.size(conflictVers) > 0 || conflictTtls != null || 
conflictExpireTimes != null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws 
IgniteCheckedException {
         super.prepareMarshal(ctx);
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
+        if (expiryPlc != null && expiryPlcBytes == null)
+            expiryPlcBytes = CU.marshal(cctx, new 
IgniteExternalizableExpiryPolicy(expiryPlc));
+
         prepareMarshalCacheObjects(keys, cctx);
 
         if (filter != null) {
@@ -555,9 +564,6 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
                 filter = null;
         }
 
-        if (expiryPlc != null && expiryPlcBytes == null)
-            expiryPlcBytes = CU.marshal(cctx, new 
IgniteExternalizableExpiryPolicy(expiryPlc));
-
         if (op == TRANSFORM) {
             // force addition of deployment info for entry processors if P2P 
is enabled globally.
             if (!addDepInfo && ctx.deploymentEnabled())
@@ -579,8 +585,18 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = U.unmarshal(ctx, expiryPlcBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
+
         finishUnmarshalCacheObjects(keys, cctx, ldr);
 
+        if (filter != null) {
+            for (CacheEntryPredicate p : filter) {
+                if (p != null)
+                    p.finishUnmarshal(cctx, ldr);
+            }
+        }
+
         if (op == TRANSFORM) {
             if (entryProcessors == null)
                 entryProcessors = unmarshalCollection(entryProcessorsBytes, 
ctx, ldr);
@@ -591,16 +607,6 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
         else
             finishUnmarshalCacheObjects(vals, cctx, ldr);
 
-        if (filter != null) {
-            for (CacheEntryPredicate p : filter) {
-                if (p != null)
-                    p.finishUnmarshal(cctx, ldr);
-            }
-        }
-
-        if (expiryPlcBytes != null && expiryPlc == null)
-            expiryPlc = U.unmarshal(ctx, expiryPlcBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()));
-
         if (partIds != null && !partIds.isEmpty()) {
             assert partIds.size() == keys.size();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 211b472..f3b9726 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -226,6 +226,11 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
         return CU.EXPIRE_TIME_CALCULATE;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean hasConflictData() {
+        return false;
+    }
+
     /**
      * {@inheritDoc}
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index b5b2c72..a8219b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -41,8 +41,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
@@ -302,15 +302,13 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
      */
     public void processDhtAtomicUpdateRequest(
         UUID nodeId,
-        GridDhtAtomicUpdateRequest req,
+        GridDhtAtomicAbstractUpdateRequest req,
         GridDhtAtomicUpdateResponse res
     ) {
         GridCacheVersion ver = req.writeVersion();
 
         assert ver != null;
 
-        Collection<KeyCacheObject> backupKeys = req.keys();
-
         boolean intercept = req.forceTransformBackups() && 
ctx.config().getInterceptor() != null;
 
         String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
@@ -329,7 +327,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
                             break;
                         }
 
-                        if (F.contains(backupKeys, key)) { // Reader became 
backup.
+                        if (req.hasKey(key)) { // Reader became backup.
                             if (entry.markObsolete(ver))
                                 removeEntry(entry);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index a6d612a..e8c5db1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -141,6 +142,7 @@ public class GridCacheAtomicMessageCountSelfTest extends 
GridCommonAbstractTest
             commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
             commSpi.registerMessage(GridNearAtomicFullUpdateRequest.class);
             commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
+            commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class);
 
             int putCnt = 15;
 
@@ -171,7 +173,7 @@ public class GridCacheAtomicMessageCountSelfTest extends 
GridCommonAbstractTest
 
             assertEquals(expNearCnt, 
commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
             assertEquals(expNearSingleCnt, 
commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
-            assertEquals(expDhtCnt, 
commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+            assertEquals(expDhtCnt, 
commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
 
             if (writeOrderMode == CLOCK) {
                 for (int i = 1; i < 4; i++) {
@@ -179,7 +181,7 @@ public class GridCacheAtomicMessageCountSelfTest extends 
GridCommonAbstractTest
 
                     assertEquals(0, 
commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class));
                     assertEquals(0, 
commSpi.messageCount(GridNearAtomicFullUpdateRequest.class));
-                    assertEquals(0, 
commSpi.messageCount(GridDhtAtomicUpdateRequest.class));
+                    assertEquals(0, 
commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class));
                 }
             }
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51ca24f2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 0899423..644e310 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,7 +478,7 @@ public class 
GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
             Object origMsg = msg.message();
 
             return delay &&
-                ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || 
(origMsg instanceof GridDhtAtomicUpdateRequest));
+                ((origMsg instanceof GridNearAtomicAbstractUpdateRequest) || 
(origMsg instanceof GridDhtAtomicAbstractUpdateRequest));
         }
     }
 }
\ No newline at end of file

Reply via email to