Repository: ignite Updated Branches: refs/heads/ignite-2523-1-resp-dht [created] bb97c217c
Simplifying DHT request/response APIs. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9833dab5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9833dab5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9833dab5 Branch: refs/heads/ignite-2523-1-resp-dht Commit: 9833dab563e6b33cc3856e5e089eaf6a22bde4d8 Parents: f163aba Author: vozerov-gridgain <[email protected]> Authored: Thu Apr 28 14:45:37 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Apr 28 14:45:37 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 33 ++++++++++++-------- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 11 ++++--- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 33 +++++++++++++++----- .../distributed/near/GridNearAtomicCache.java | 5 +-- 5 files changed, 55 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2c0417c..a093fa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -3066,7 +3066,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); try { - if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) + if (res.failedCount() > 0 || res.nearEvictedCount() > 0 || req.writeSynchronizationMode() == FULL_SYNC) ctx.io().send(nodeId, res, ctx.ioPolicy()); else { // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 82238e0..0043bf1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +44,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -412,24 +412,31 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param nodeId Backup node ID. * @param updateRes Update response. */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { if (log.isDebugEnabled()) log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); - if (updateRes.error() != null) - this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error()); + if (updateRes.error() != null) { + List<KeyCacheObject> failed = new ArrayList<>(updateRes.failedCount()); - if (!F.isEmpty(updateRes.nearEvicted())) { - for (KeyCacheObject key : updateRes.nearEvicted()) { - GridDhtCacheEntry entry = nearReadersEntries.get(key); + for (int i = 0; i < updateRes.failedCount(); i++) + failed.add(updateRes.failed(i)); - try { - entry.removeReader(nodeId, updateRes.messageId()); - } - catch (GridCacheEntryRemovedException e) { - if (log.isDebugEnabled()) - log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']'); - } + this.updateRes.addFailedKeys(failed, updateRes.error()); + } + + for (int i = 0; i < updateRes.nearEvictedCount(); i++) { + KeyCacheObject key = updateRes.nearEvicted(i); + + GridDhtCacheEntry entry = nearReadersEntries.get(key); + + try { + entry.removeReader(nodeId, updateRes.messageId()); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 6b050b1..ed275ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; import javax.cache.processor.EntryProcessor; @@ -38,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -442,10 +442,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** - * @return Keys. + * Check whether the followin key exist. + * + * @param key Key. + * @return {@code True} if exist. */ - public Collection<KeyCacheObject> keys() { - return keys; + public boolean hasKey(KeyCacheObject key) { + return F.contains(keys, key); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 383e515..7099a4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; @@ -113,10 +112,20 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** - * @return Failed keys. + * @return Amount of failed keys. */ - public Collection<KeyCacheObject> failedKeys() { - return failedKeys; + public int failedCount() { + return failedKeys != null ? failedKeys.size() : 0; + } + + /** + * Return failed key. + * + * @param idx Index. + * @return Failed key. + */ + public KeyCacheObject failed(int idx) { + return failedKeys.get(idx); } /** @@ -138,10 +147,20 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** - * @return Evicted readers. + * @return Amount of near evicted keys. + */ + public int nearEvictedCount() { + return nearEvicted != null ? nearEvicted.size() : 0; + } + + /** + * Return near evicted key. + * + * @param idx Index. + * @return Near evicted key. */ - public Collection<KeyCacheObject> nearEvicted() { - return nearEvicted; + public KeyCacheObject nearEvicted(int idx) { + return nearEvicted.get(idx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 97be2eb..febf0b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -314,8 +313,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { assert ver != null; - Collection<KeyCacheObject> backupKeys = req.keys(); - boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); @@ -334,7 +331,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { break; } - if (F.contains(backupKeys, key)) { // Reader became backup. + if (req.hasKey(key)) { // Reader became backup. if (entry.markObsolete(ver)) removeEntry(entry);
