Refactored keys and vals collections.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f3b4fa26 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f3b4fa26 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f3b4fa26 Branch: refs/heads/ignite-2926 Commit: f3b4fa26319796baf4b94d7166ac49d262b3cf6d Parents: 2776cca Author: vozerov-gridgain <[email protected]> Authored: Thu Mar 31 17:08:46 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Thu Mar 31 17:08:46 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAtomicFuture.java | 5 - .../dht/atomic/GridDhtAtomicCache.java | 12 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 - .../GridNearAtomicSingleUpdateFuture.java | 174 +++++++------------ .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 - 5 files changed, 71 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 359909e..c96d00f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -38,9 +38,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> { * @return Future or {@code null} if no need to wait. */ public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer); - - /** - * @return Future keys. - */ - public Collection<?> keys(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 00680ec..3ae43db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1063,19 +1063,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean waitTopFut ) { GridCacheOperation op; - Collection vals; + Object updVal; if (val != null) { op = UPDATE; - vals = Collections.singletonList(val); + updVal = val; } else if (proc != null) { op = TRANSFORM; - vals = Collections.singletonList(proc); + updVal = proc; } else { op = DELETE; - vals = null; + updVal = null; } CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1085,8 +1085,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { this, ctx.config().getWriteSynchronizationMode(), op, - Collections.singletonList(key), - vals, + key, + updVal, invokeArgs, retval, opCtx != null ? opCtx.expiry() : null, http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 9f52658..4721d6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -212,11 +212,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> return null; } - /** {@inheritDoc} */ - @Override public Collection<KeyCacheObject> keys() { - return keys; - } - /** * @param entry Entry to map. * @param val Value to write. http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 4112308..8bb6ebe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -55,7 +55,6 @@ import javax.cache.expiry.ExpiryPolicy; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +63,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** @@ -87,11 +87,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> private final GridCacheOperation op; /** Keys */ - private Collection<?> keys; + private Object key; /** Values. */ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) - private Collection<?> vals; + private Object val; /** Optional arguments for entry processor. */ private Object[] invokeArgs; @@ -143,8 +143,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> * @param cache Cache instance. * @param syncMode Write synchronization mode. * @param op Update operation. - * @param keys Keys to update. - * @param vals Values or transform closure. + * @param key Keys to update. + * @param val Values or transform closure. * @param invokeArgs Optional arguments for entry processor. * @param retval Return value require flag. * @param expiryPlc Expiry policy explicitly specified for cache operation. @@ -161,8 +161,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> GridDhtAtomicCache cache, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, - Collection<?> keys, - @Nullable Collection<?> vals, + Object key, + @Nullable Object val, @Nullable Object[] invokeArgs, final boolean retval, @Nullable ExpiryPolicy expiryPlc, @@ -174,15 +174,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> int remapCnt, boolean waitTopFut ) { - assert vals == null || vals.size() == keys.size(); assert subjId != null; this.cctx = cctx; this.cache = cache; this.syncMode = syncMode; this.op = op; - this.keys = keys; - this.vals = vals; + this.key = key; + this.val = val; this.invokeArgs = invokeArgs; this.retval = retval; this.expiryPlc = expiryPlc; @@ -229,11 +228,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> } /** {@inheritDoc} */ - @Override public Collection<?> keys() { - return keys; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { state.onNodeLeft(nodeId); @@ -818,8 +812,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> GridNearAtomicUpdateRequest singleReq0 = null; Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; - int size = keys.size(); - GridCacheVersion futVer = cctx.versions().next(topVer); GridCacheVersion updVer; @@ -839,7 +831,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> updVer = null; try { - if (size == 1 && !fastMap) { + if (!fastMap) { assert remapKeys == null || remapKeys.size() == 1; singleReq0 = mapSingleUpdate(topVer, futVer, updVer); @@ -865,7 +857,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> else mappings0 = pendingMappings; - assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this; + assert !mappings0.isEmpty() : GridNearAtomicSingleUpdateFuture.this; } } @@ -909,10 +901,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> else { assert mappings0 != null; - if (size == 0) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - else - doUpdate(mappings0); + doUpdate(mappings0); } } @@ -972,93 +961,68 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> GridCacheVersion futVer, @Nullable GridCacheVersion updVer, @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { - Iterator<?> it = null; - - if (vals != null) - it = vals.iterator(); - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); - // Create mappings first, then send messages. - for (Object key : keys) { - if (key == null) - throw new NullPointerException("Null key."); - - Object val; - - if (vals != null) { - val = it.next(); - - if (val == null) - throw new NullPointerException("Null value."); - } - else { - val = null; - } - - if (val == null && op != GridCacheOperation.DELETE) - continue; - + if (val != null || op == DELETE) { KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - if (remapKeys != null && !remapKeys.contains(cacheKey)) - continue; - - if (op != TRANSFORM) - val = cctx.toCacheObject(val); - - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); - - if (affNodes.isEmpty()) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); + if (remapKeys == null || remapKeys.contains(cacheKey)) { + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - int i = 0; + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); - for (ClusterNode affNode : affNodes) { - if (affNode == null) + if (affNodes.isEmpty()) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid)."); - UUID nodeId = affNode.id(); - - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - keys.size()); - - pendingMappings.put(nodeId, mapped); - } + int i = 0; + + for (ClusterNode affNode : affNodes) { + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + UUID nodeId = affNode.id(); + + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); + + if (mapped == null) { + mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + pendingMappings.put(nodeId, mapped); + } - mapped.addUpdateEntry(cacheKey, - val, - CU.TTL_NOT_CHANGED, - CU.EXPIRE_TIME_CALCULATE, - null, - i == 0 - ); + mapped.addUpdateEntry(cacheKey, + val, + CU.TTL_NOT_CHANGED, + CU.EXPIRE_TIME_CALCULATE, + null, + i == 0 + ); - i++; + i++; + } } } @@ -1075,16 +1039,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception { - Object key = F.first(keys); - - Object val; - - if (vals != null) - // Regular PUT. - val = F.first(vals); - else - // Regular REMOVE. - val = null; + Object key = GridNearAtomicSingleUpdateFuture.this.key; + Object val = GridNearAtomicSingleUpdateFuture.this.val; // We still can get here if user pass map with single element. if (key == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 69e6274..c33df07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -251,11 +251,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem } /** {@inheritDoc} */ - @Override public Collection<?> keys() { - return keys; - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { state.onNodeLeft(nodeId);
