Reworked "remapKeys" logic.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfae05a6 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfae05a6 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfae05a6 Branch: refs/heads/ignite-2926 Commit: dfae05a6bae2e2ef9d02fc1bb37f6a7208a338f3 Parents: 61bdf4c Author: vozerov-gridgain <[email protected]> Authored: Fri Apr 15 12:42:40 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Apr 15 12:42:40 2016 +0300 ---------------------------------------------------------------------- .../GridNearAtomicSingleUpdateFuture.java | 127 ++++++++----------- 1 file changed, 55 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dfae05a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 6c1a402..fc85db3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -77,7 +77,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda private Map<UUID, GridNearAtomicUpdateRequest> mappings; /** Keys to remap. */ - private Collection<KeyCacheObject> remapKeys; + private boolean remapKey; /** Not null is operation is mapped to single node. */ private GridNearAtomicUpdateRequest singleReq; @@ -189,7 +189,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda // Cannot remap. remapCnt = 1; - map(topVer, null); + map(topVer); } } @@ -299,10 +299,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda if (res.remapKeys() != null) { assert !fastMap || cctx.kernalContext().clientNode(); - if (remapKeys == null) - remapKeys = U.newHashSet(res.remapKeys().size()); - - remapKeys.addAll(res.remapKeys()); + remapKey = true; if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) mapErrTopVer = req.topologyVersion(); @@ -343,7 +340,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } if (rcvAll) { - if (remapKeys != null) { + if (remapKey) { assert mapErrTopVer != null; remapTopVer = cctx.shared().exchange().topologyVersion(); @@ -368,12 +365,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda err = null; - Collection<Object> failedKeys = cause.failedKeys(); - - remapKeys = new ArrayList<>(failedKeys.size()); - - for (Object key : failedKeys) - remapKeys.add(cctx.toCacheKeyObject(key)); + remapKey = true; updVer = null; } @@ -428,7 +420,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda } if (topLocked) { - assert !F.isEmpty(remapKeys) : remapKeys; + assert remapKey; CachePartialUpdateCheckedException e = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); @@ -438,7 +430,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); - e.add(remapKeys, cause); + e.add(Collections.singleton(keys), cause); onDone(e); @@ -458,7 +450,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda try { AffinityTopologyVersion topVer = fut.get(); - map(topVer, remapKeys); + map(topVer); } catch (IgniteCheckedException e) { onDone(e); @@ -545,7 +537,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda cache.topology().readUnlock(); } - map(topVer, null); + map(topVer); } /** @@ -642,9 +634,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda /** * @param topVer Topology version. - * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + void map(AffinityTopologyVersion topVer) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -677,17 +668,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda updVer = null; try { - if (!fastMap) { - assert remapKeys == null || remapKeys.size() == 1; - + if (!fastMap) singleReq0 = mapSingleUpdate(topVer, futVer, updVer); - } else { Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, topVer, futVer, - updVer, - remapKeys); + updVer); if (pendingMappings.size() == 1) singleReq0 = F.firstValue(pendingMappings); @@ -720,7 +707,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda singleReq = singleReq0; mappings = mappings0; - this.remapKeys = null; + this.remapKey = false; } } catch (Exception e) { @@ -780,15 +767,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda * @param topVer Topology version. * @param futVer Future version. * @param updVer Update version. - * @param remapKeys Keys to remap. * @return Mapping. * @throws Exception If failed. */ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, AffinityTopologyVersion topVer, GridCacheVersion futVer, - @Nullable GridCacheVersion updVer, - @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { + @Nullable GridCacheVersion updVer) throws Exception { Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); if (keys == null) @@ -801,57 +786,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridAbstractNearAtomicUpda KeyCacheObject cacheKey = cctx.toCacheKeyObject(keys); - if (remapKeys == null || remapKeys.contains(cacheKey)) { - if (op != TRANSFORM) - val = cctx.toCacheObject(val); + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); + + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer); + int i = 0; - if (affNodes.isEmpty()) + for (ClusterNode affNode : affNodes) { + if (affNode == null) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid)."); - int i = 0; - - for (ClusterNode affNode : affNodes) { - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - UUID nodeId = affNode.id(); - - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); - - pendingMappings.put(nodeId, mapped); - } + UUID nodeId = affNode.id(); - mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, null, i == 0); + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - i++; + if (mapped == null) { + mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + pendingMappings.put(nodeId, mapped); } + + mapped.addUpdateEntry(cacheKey, val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, null, i == 0); + + i++; } return pendingMappings;
