Repository: ignite Updated Branches: refs/heads/ignite-2523-1 5d4af1a16 -> 5866b999f
Refactoring response. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/091f5d10 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/091f5d10 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/091f5d10 Branch: refs/heads/ignite-2523-1 Commit: 091f5d10ddc17a5a45a08e62a3c31bece63b1869 Parents: 10387a7 Author: vozerov-gridgain <[email protected]> Authored: Tue Apr 26 12:22:21 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Apr 26 12:22:21 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 21 +++-- .../GridNearAtomicSingleUpdateFuture.java | 18 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 23 ++--- .../atomic/GridNearAtomicUpdateResponse.java | 93 +++++++++++++------- .../distributed/near/GridNearAtomicCache.java | 23 +++-- 5 files changed, 108 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c48787c..37a5f45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -210,12 +210,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (req.writeSynchronizationMode() != FULL_ASYNC) sendNearUpdateReply(res.nodeId(), res); else { - if (!F.isEmpty(res.remapKeys())) + if (res.remapKeysCount() > 0) // Remap keys on primary node in FULL_ASYNC mode. remapToNewPrimary(req); else if (res.error() != null) { + Collection<KeyCacheObject> failedKeys = new ArrayList<>(res.failedKeysCount()); + + for (int i = 0; i < res.failedKeysCount(); i++) + failedKeys.add(res.failedKey(i)); + U.error(log, "Failed to process write update request in FULL_ASYNC mode for keys: " + - res.failedKeys(), res.error()); + failedKeys, res.error()); } } } @@ -1526,7 +1531,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { try { if (top.stopping()) { - res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " + + res.addFailedKeys(req, new IgniteCheckedException("Failed to perform cache operation " + "(cache is stopped): " + name())); completionCb.apply(req, res); @@ -1669,7 +1674,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); completionCb.apply(req, res); @@ -1682,7 +1687,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (remap) { assert dhtFut == null; - res.remapKeys(req.keys()); + res.remapKeys(req); completionCb.apply(req, res); } @@ -1739,7 +1744,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { reloadIfNeeded(locked); } catch (IgniteCheckedException e) { - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); return new UpdateBatchResult(); } @@ -2593,7 +2598,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (IgniteCheckedException e) { - res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx); + res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e); } if (storeErr != null) { @@ -2602,7 +2607,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { for (Object failedKey : storeErr.failedKeys()) failed.add(ctx.toCacheKeyObject(failedKey)); - res.addFailedKeys(failed, storeErr.getCause(), ctx); + res.addFailedKeys(failed, storeErr.getCause()); } return dhtFut; http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index d8c217e..34399ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -163,7 +163,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); } } @@ -233,22 +233,22 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda this.req = null; - boolean remapKey = !F.isEmpty(res.remapKeys()); + boolean remapKey = res.remapKeysCount() > 0; if (remapKey) { if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { - if (res.failedKeys() != null) { + if (res.failedKeysCount() > 0) { if (err == null) err = new CachePartialUpdateCheckedException( "Failed to update keys (retry update if possible)."); - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + Collection<Object> keys = new ArrayList<>(res.failedKeysCount()); - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + for (int i = 0; i < res.failedKeysCount(); i++) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(res.failedKey(i), keepBinary, false)); err.add(keys, res.error(), req.topologyVersion()); } @@ -319,7 +319,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } } - if (res.error() != null && res.failedKeys() == null) { + if (res.error() != null && res.failedKeysCount() == 0) { onDone(res.error()); return; @@ -392,7 +392,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { assert nearEnabled; - if (res.remapKeys() != null || !req.hasPrimary()) + if (res.remapKeysCount() > 0 || !req.hasPrimary()) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -496,7 +496,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda req.futureVersion(), cctx.deploymentEnabled()); - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); onResult(req.nodeId(), res, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 7021e2f..bad4647 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -185,7 +185,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); } } @@ -298,27 +298,28 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu assert req != null && req.topologyVersion().equals(topVer) : req; - if (res.remapKeys() != null) { + if (res.remapKeysCount() > 0) { assert !fastMap || cctx.kernalContext().clientNode(); if (remapKeys == null) - remapKeys = U.newHashSet(res.remapKeys().size()); + remapKeys = U.newHashSet(res.remapKeysCount()); - remapKeys.addAll(res.remapKeys()); + for (int i = 0; i < res.remapKeysCount(); i++) + remapKeys.add(res.remapKey(i)); if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) mapErrTopVer = req.topologyVersion(); } else if (res.error() != null) { - if (res.failedKeys() != null) { + if (res.failedKeysCount() > 0) { if (err == null) err = new CachePartialUpdateCheckedException( "Failed to update keys (retry update if possible)."); - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + Collection<Object> keys = new ArrayList<>(res.failedKeysCount()); - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + for (int i = 0; i < res.failedKeysCount(); i++) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(res.failedKey(i), keepBinary, false)); err.add(keys, res.error(), req.topologyVersion()); } @@ -399,7 +400,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } } - if (res.error() != null && res.failedKeys() == null) { + if (res.error() != null && res.failedKeysCount() == 0) { onDone(res.error()); return; @@ -486,7 +487,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { assert nearEnabled; - if (res.remapKeys() != null || !req.hasPrimary()) + if (res.remapKeysCount() > 0 || !req.hasPrimary()) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -634,7 +635,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu req.futureVersion(), cctx.deploymentEnabled()); - res.addFailedKeys(req.keys(), e); + res.addFailedKeys(req, e); onResult(req.nodeId(), res, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index f47bb75..bc552df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; @@ -75,7 +74,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Failed keys. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) - private volatile Collection<KeyCacheObject> failedKeys; + private volatile List<KeyCacheObject> failedKeys; /** Keys that should be remapped. */ @GridToStringInclude @@ -167,10 +166,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @return Collection of failed keys. + * @return Failed keys count. */ - public Collection<KeyCacheObject> failedKeys() { - return failedKeys; + public int failedKeysCount() { + return failedKeys == null ? 0 : failedKeys.size(); + } + + /** + * @param idx Index. + * @return Failed key. + */ + public KeyCacheObject failedKey(int idx) { + return failedKeys.get(idx); } /** @@ -189,17 +196,28 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @param remapKeys Remap keys. + * @param req Request. + */ + public void remapKeys(GridNearAtomicAbstractUpdateRequest req) { + remapKeys = new ArrayList<>(req.keysCount()); + + for (int i = 0; i < req.keysCount(); i++) + remapKeys.add(req.key(i)); + } + + /** + * @param idx Index. + * @return Remap key. */ - public void remapKeys(List<KeyCacheObject> remapKeys) { - this.remapKeys = remapKeys; + public KeyCacheObject remapKey(int idx) { + return remapKeys.get(idx); } /** - * @return Remap keys. + * @return Remap keys count. */ - public Collection<KeyCacheObject> remapKeys() { - return remapKeys; + public int remapKeysCount() { + return remapKeys == null ? 0 : remapKeys.size(); } /** @@ -312,18 +330,24 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** - * @return Indexes of keys for which update was skipped + * Check if update was skipped for the given index. + * + * @param idx Index. + * @return {@code True} if skipped. */ - @Nullable public List<Integer> skippedIndexes() { - return nearSkipIdxs; + public boolean isNearSkippedIndex(int idx) { + return nearSkipIdxs != null && nearSkipIdxs.contains(idx); } /** - * @return Indexes of keys for which values were generated on primary node. + * Check if this is an index of a key for which values were generated on primary node. + * + * @param idx Index. + * @return {@code True} if values were generated on primary node. */ - @Nullable public List<Integer> nearValuesIndexes() { - return nearValsIdxs; - } + public boolean isNearValueIndex(int idx) { + return nearValsIdxs != null && nearValsIdxs.contains(idx); + } /** * @param idx Index. @@ -341,14 +365,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr */ public synchronized void addFailedKey(KeyCacheObject key, Throwable e) { if (failedKeys == null) - failedKeys = new ConcurrentLinkedQueue<>(); + failedKeys = new ArrayList<>(1); failedKeys.add(key); - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); + setFailedKeysError(e); } /** @@ -365,25 +386,31 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr failedKeys.addAll(keys); } - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); + setFailedKeysError(e); } /** - * Adds keys to collection of failed keys. + * Add keys to collection of failed keys. * - * @param keys Key to add. + * @param req Request. * @param e Error cause. - * @param ctx Context. */ - public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) { + public synchronized void addFailedKeys(GridNearAtomicAbstractUpdateRequest req, Throwable e) { if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); + failedKeys = new ArrayList<>(req.keysCount()); - failedKeys.addAll(keys); + for (int i = 0; i < req.keysCount(); i++) + failedKeys.add(req.key(i)); + + setFailedKeysError(e); + } + /** + * Set failed keys error. + * + * @param e Error. + */ + private void setFailedKeysError(Throwable e) { if (err == null) err = new IgniteCheckedException("Failed to update keys on primary node."); http://git-wip-us.apache.org/repos/asf/ignite/blob/091f5d10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ac87ead..ac1ef70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -129,18 +129,13 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res ) { - if (F.size(res.failedKeys()) == req.keysCount()) + if (res.failedKeysCount() == req.keysCount()) return; /* * Choose value to be stored in near cache: first check key is not in failed and not in skipped list, * then check if value was generated on primary node, if not then use value sent in request. */ - - Collection<KeyCacheObject> failed = res.failedKeys(); - List<Integer> nearValsIdxs = res.nearValuesIndexes(); - List<Integer> skipped = res.skippedIndexes(); - GridCacheVersion ver = req.updateVersion(); if (ver == null) @@ -153,12 +148,22 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); for (int i = 0; i < req.keysCount(); i++) { - if (F.contains(skipped, i)) + if (res.isNearSkippedIndex(i)) continue; KeyCacheObject key = req.key(i); - if (F.contains(failed, key)) + boolean failed = false; + + for (int j = 0; j < res.failedKeysCount(); j++) { + if (F.eq(res.failedKey(j), key)) { + failed = true; + + break; + } + } + + if (failed) continue; if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. @@ -172,7 +177,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { CacheObject val = null; - if (F.contains(nearValsIdxs, i)) { + if (res.isNearValueIndex(i)) { val = res.nearValue(nearValIdx); nearValIdx++;
