Repository: ignite
Updated Branches:
  refs/heads/ignite-2523-1 5d4af1a16 -> 5866b999f


Refactoring response.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/091f5d10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/091f5d10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/091f5d10

Branch: refs/heads/ignite-2523-1
Commit: 091f5d10ddc17a5a45a08e62a3c31bece63b1869
Parents: 10387a7
Author: vozerov-gridgain <[email protected]>
Authored: Tue Apr 26 12:22:21 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Tue Apr 26 12:22:21 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 21 +++--
 .../GridNearAtomicSingleUpdateFuture.java       | 18 ++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 23 ++---
 .../atomic/GridNearAtomicUpdateResponse.java    | 93 +++++++++++++-------
 .../distributed/near/GridNearAtomicCache.java   | 23 +++--
 5 files changed, 108 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c48787c..37a5f45 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -210,12 +210,17 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
                     sendNearUpdateReply(res.nodeId(), res);
                 else {
-                    if (!F.isEmpty(res.remapKeys()))
+                    if (res.remapKeysCount() > 0)
                         // Remap keys on primary node in FULL_ASYNC mode.
                         remapToNewPrimary(req);
                     else if (res.error() != null) {
+                        Collection<KeyCacheObject> failedKeys = new 
ArrayList<>(res.failedKeysCount());
+
+                        for (int i = 0; i < res.failedKeysCount(); i++)
+                            failedKeys.add(res.failedKey(i));
+
                         U.error(log, "Failed to process write update request 
in FULL_ASYNC mode for keys: " +
-                            res.failedKeys(), res.error());
+                            failedKeys, res.error());
                     }
                 }
             }
@@ -1526,7 +1531,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
                 try {
                     if (top.stopping()) {
-                        res.addFailedKeys(req.keys(), new 
IgniteCheckedException("Failed to perform cache operation " +
+                        res.addFailedKeys(req, new 
IgniteCheckedException("Failed to perform cache operation " +
                             "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
@@ -1669,7 +1674,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(req.keys(), e);
+            res.addFailedKeys(req, e);
 
             completionCb.apply(req, res);
 
@@ -1682,7 +1687,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(req.keys());
+            res.remapKeys(req);
 
             completionCb.apply(req, res);
         }
@@ -1739,7 +1744,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
                 reloadIfNeeded(locked);
             }
             catch (IgniteCheckedException e) {
-                res.addFailedKeys(req.keys(), e);
+                res.addFailedKeys(req, e);
 
                 return new UpdateBatchResult();
             }
@@ -2593,7 +2598,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             }
         }
         catch (IgniteCheckedException e) {
-            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, 
ctx);
+            res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
         }
 
         if (storeErr != null) {
@@ -2602,7 +2607,7 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             for (Object failedKey : storeErr.failedKeys())
                 failed.add(ctx.toCacheKeyObject(failedKey));
 
-            res.addFailedKeys(failed, storeErr.getCause(), ctx);
+            res.addFailedKeys(failed, storeErr.getCause());
         }
 
         return dhtFut;

http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index d8c217e..34399ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -163,7 +163,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
                 
e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
 
-                res.addFailedKeys(req.keys(), e);
+                res.addFailedKeys(req, e);
             }
         }
 
@@ -233,22 +233,22 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
 
             this.req = null;
 
-            boolean remapKey = !F.isEmpty(res.remapKeys());
+            boolean remapKey = res.remapKeysCount() > 0;
 
             if (remapKey) {
                 if (mapErrTopVer == null || 
mapErrTopVer.compareTo(req.topologyVersion()) < 0)
                     mapErrTopVer = req.topologyVersion();
             }
             else if (res.error() != null) {
-                if (res.failedKeys() != null) {
+                if (res.failedKeysCount() > 0) {
                     if (err == null)
                         err = new CachePartialUpdateCheckedException(
                             "Failed to update keys (retry update if 
possible).");
 
-                    Collection<Object> keys = new 
ArrayList<>(res.failedKeys().size());
+                    Collection<Object> keys = new 
ArrayList<>(res.failedKeysCount());
 
-                    for (KeyCacheObject key : res.failedKeys())
-                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, 
false));
+                    for (int i = 0; i < res.failedKeysCount(); i++)
+                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(res.failedKey(i), 
keepBinary, false));
 
                     err.add(keys, res.error(), req.topologyVersion());
                 }
@@ -319,7 +319,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
             }
         }
 
-        if (res.error() != null && res.failedKeys() == null) {
+        if (res.error() != null && res.failedKeysCount() == 0) {
             onDone(res.error());
 
             return;
@@ -392,7 +392,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
     private void updateNear(GridNearAtomicAbstractUpdateRequest req, 
GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapKeysCount() > 0 || !req.hasPrimary())
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -496,7 +496,7 @@ public class GridNearAtomicSingleUpdateFuture extends 
GridNearAtomicAbstractUpda
                 req.futureVersion(),
                 cctx.deploymentEnabled());
 
-            res.addFailedKeys(req.keys(), e);
+            res.addFailedKeys(req, e);
 
             onResult(req.nodeId(), res, true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7021e2f..bad4647 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -185,7 +185,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
                 
e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
 
-                res.addFailedKeys(req.keys(), e);
+                res.addFailedKeys(req, e);
             }
         }
 
@@ -298,27 +298,28 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
 
             assert req != null && req.topologyVersion().equals(topVer) : req;
 
-            if (res.remapKeys() != null) {
+            if (res.remapKeysCount() > 0) {
                 assert !fastMap || cctx.kernalContext().clientNode();
 
                 if (remapKeys == null)
-                    remapKeys = U.newHashSet(res.remapKeys().size());
+                    remapKeys = U.newHashSet(res.remapKeysCount());
 
-                remapKeys.addAll(res.remapKeys());
+                for (int i = 0; i < res.remapKeysCount(); i++)
+                    remapKeys.add(res.remapKey(i));
 
                 if (mapErrTopVer == null || 
mapErrTopVer.compareTo(req.topologyVersion()) < 0)
                     mapErrTopVer = req.topologyVersion();
             }
             else if (res.error() != null) {
-                if (res.failedKeys() != null) {
+                if (res.failedKeysCount() > 0) {
                     if (err == null)
                         err = new CachePartialUpdateCheckedException(
                             "Failed to update keys (retry update if 
possible).");
 
-                    Collection<Object> keys = new 
ArrayList<>(res.failedKeys().size());
+                    Collection<Object> keys = new 
ArrayList<>(res.failedKeysCount());
 
-                    for (KeyCacheObject key : res.failedKeys())
-                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, 
false));
+                    for (int i = 0; i < res.failedKeysCount(); i++)
+                        
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(res.failedKey(i), 
keepBinary, false));
 
                     err.add(keys, res.error(), req.topologyVersion());
                 }
@@ -399,7 +400,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
             }
         }
 
-        if (res.error() != null && res.failedKeys() == null) {
+        if (res.error() != null && res.failedKeysCount() == 0) {
             onDone(res.error());
 
             return;
@@ -486,7 +487,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
     private void updateNear(GridNearAtomicUpdateRequest req, 
GridNearAtomicUpdateResponse res) {
         assert nearEnabled;
 
-        if (res.remapKeys() != null || !req.hasPrimary())
+        if (res.remapKeysCount() > 0 || !req.hasPrimary())
             return;
 
         GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -634,7 +635,7 @@ public class GridNearAtomicUpdateFuture extends 
GridNearAtomicAbstractUpdateFutu
                 req.futureVersion(),
                 cctx.deploymentEnabled());
 
-            res.addFailedKeys(req.keys(), e);
+            res.addFailedKeys(req, e);
 
             onResult(req.nodeId(), res, true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index f47bb75..bc552df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectTransient;
@@ -75,7 +74,7 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     /** Failed keys. */
     @GridToStringInclude
     @GridDirectCollection(KeyCacheObject.class)
-    private volatile Collection<KeyCacheObject> failedKeys;
+    private volatile List<KeyCacheObject> failedKeys;
 
     /** Keys that should be remapped. */
     @GridToStringInclude
@@ -167,10 +166,18 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     }
 
     /**
-     * @return Collection of failed keys.
+     * @return Failed keys count.
      */
-    public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
+    public int failedKeysCount() {
+        return failedKeys == null ? 0 : failedKeys.size();
+    }
+
+    /**
+     * @param idx Index.
+     * @return Failed key.
+     */
+    public KeyCacheObject failedKey(int idx) {
+        return failedKeys.get(idx);
     }
 
     /**
@@ -189,17 +196,28 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     }
 
     /**
-     * @param remapKeys Remap keys.
+     * @param req Request.
+     */
+    public void remapKeys(GridNearAtomicAbstractUpdateRequest req) {
+        remapKeys = new ArrayList<>(req.keysCount());
+
+        for (int i = 0; i < req.keysCount(); i++)
+            remapKeys.add(req.key(i));
+    }
+
+    /**
+     * @param idx Index.
+     * @return Remap key.
      */
-    public void remapKeys(List<KeyCacheObject> remapKeys) {
-        this.remapKeys = remapKeys;
+    public KeyCacheObject remapKey(int idx) {
+        return remapKeys.get(idx);
     }
 
     /**
-     * @return Remap keys.
+     * @return Remap keys count.
      */
-    public Collection<KeyCacheObject> remapKeys() {
-        return remapKeys;
+    public int remapKeysCount() {
+        return remapKeys == null ? 0 : remapKeys.size();
     }
 
     /**
@@ -312,18 +330,24 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     }
 
     /**
-     * @return Indexes of keys for which update was skipped
+     * Check if update was skipped for the given index.
+     *
+     * @param idx Index.
+     * @return {@code True} if skipped.
      */
-    @Nullable public List<Integer> skippedIndexes() {
-        return nearSkipIdxs;
+    public boolean isNearSkippedIndex(int idx) {
+        return nearSkipIdxs != null && nearSkipIdxs.contains(idx);
     }
 
     /**
-     * @return Indexes of keys for which values were generated on primary node.
+     * Check if this is an index of a key for which values were generated on 
primary node.
+     *
+     * @param idx Index.
+     * @return {@code True} if values were generated on primary node.
      */
-   @Nullable public List<Integer> nearValuesIndexes() {
-        return nearValsIdxs;
-   }
+    public boolean isNearValueIndex(int idx) {
+        return nearValsIdxs != null && nearValsIdxs.contains(idx);
+    }
 
     /**
      * @param idx Index.
@@ -341,14 +365,11 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
      */
     public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
         if (failedKeys == null)
-            failedKeys = new ConcurrentLinkedQueue<>();
+            failedKeys = new ArrayList<>(1);
 
         failedKeys.add(key);
 
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary 
node.");
-
-        err.addSuppressed(e);
+        setFailedKeysError(e);
     }
 
     /**
@@ -365,25 +386,31 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
             failedKeys.addAll(keys);
         }
 
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary 
node.");
-
-        err.addSuppressed(e);
+        setFailedKeysError(e);
     }
 
     /**
-     * Adds keys to collection of failed keys.
+     * Add keys to collection of failed keys.
      *
-     * @param keys Key to add.
+     * @param req Request.
      * @param e Error cause.
-     * @param ctx Context.
      */
-    public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, 
Throwable e, GridCacheContext ctx) {
+    public synchronized void addFailedKeys(GridNearAtomicAbstractUpdateRequest 
req, Throwable e) {
         if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
+            failedKeys = new ArrayList<>(req.keysCount());
 
-        failedKeys.addAll(keys);
+        for (int i = 0; i < req.keysCount(); i++)
+            failedKeys.add(req.key(i));
+
+        setFailedKeysError(e);
+    }
 
+    /**
+     * Set failed keys error.
+     *
+     * @param e Error.
+     */
+    private void setFailedKeysError(Throwable e) {
         if (err == null)
             err = new IgniteCheckedException("Failed to update keys on primary 
node.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ac87ead..ac1ef70 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -129,18 +129,13 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
         GridNearAtomicAbstractUpdateRequest req,
         GridNearAtomicUpdateResponse res
     ) {
-        if (F.size(res.failedKeys()) == req.keysCount())
+        if (res.failedKeysCount() == req.keysCount())
             return;
 
         /*
          * Choose value to be stored in near cache: first check key is not in 
failed and not in skipped list,
          * then check if value was generated on primary node, if not then use 
value sent in request.
          */
-
-        Collection<KeyCacheObject> failed = res.failedKeys();
-        List<Integer> nearValsIdxs = res.nearValuesIndexes();
-        List<Integer> skipped = res.skippedIndexes();
-
         GridCacheVersion ver = req.updateVersion();
 
         if (ver == null)
@@ -153,12 +148,22 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
         String taskName = 
ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
 
         for (int i = 0; i < req.keysCount(); i++) {
-            if (F.contains(skipped, i))
+            if (res.isNearSkippedIndex(i))
                 continue;
 
             KeyCacheObject key = req.key(i);
 
-            if (F.contains(failed, key))
+            boolean failed = false;
+
+            for (int j = 0; j < res.failedKeysCount(); j++) {
+                if (F.eq(res.failedKey(j), key)) {
+                    failed = true;
+
+                    break;
+                }
+            }
+
+            if (failed)
                 continue;
 
             if (ctx.affinity().belongs(ctx.localNode(), 
ctx.affinity().partition(key), req.topologyVersion())) { // Reader became 
backup.
@@ -172,7 +177,7 @@ public class GridNearAtomicCache<K, V> extends 
GridNearCacheAdapter<K, V> {
 
             CacheObject val = null;
 
-            if (F.contains(nearValsIdxs, i)) {
+            if (res.isNearValueIndex(i)) {
                 val = res.nearValue(nearValIdx);
 
                 nearValIdx++;

Reply via email to