# ignite-1124
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a965d439 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a965d439 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a965d439 Branch: refs/heads/ignite-1124 Commit: a965d43948becaf75206714b59e7542de276bada Parents: 5665a7d Author: sboikov <[email protected]> Authored: Wed Aug 26 13:32:11 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 26 13:32:11 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 366 ++++++++++--------- .../IgniteCachePutRetryAbstractSelfTest.java | 5 +- 2 files changed, 199 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a965d439/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d0c8766..df485dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -82,9 +82,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Collection<GridCacheVersion> conflictRmvVals; - /** Operation result. */ - private volatile GridCacheReturn opRes; - /** Return value require flag. */ private final boolean retval; @@ -376,6 +373,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } else { if (waitTopFut) { + assert !topLocked : this; + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @@ -507,16 +506,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * */ private class UpdateState { - /** */ + /** Current topology version. */ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; /** */ private GridCacheVersion updVer; - /** */ + /** Topology version when got mapping error. */ private AffinityTopologyVersion mapErrTopVer; - /** Mappings. */ + /** Mappings if operations is mapped to more than one node. */ @GridToStringInclude private Map<UUID, GridNearAtomicUpdateRequest> mappings; @@ -529,32 +528,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Completion future for a particular topology version. */ private GridFutureAdapter<Void> topCompleteFut; - /** */ + /** Keys to remap. */ private Collection<KeyCacheObject> remapKeys; - /** */ + /** Not null is operation is mapped to single node. */ private GridNearAtomicUpdateRequest singleReq; - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. - */ - private void addFailedKeys(Collection<KeyCacheObject> failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; - - if (err0 == null) - err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(failedKeys.size()); - - for (KeyCacheObject key : failedKeys) - keys.add(key.value(cctx.cacheObjectContext(), false)); - - err0.add(keys, err, topVer); - } + /** Operation result. */ + private GridCacheReturn opRes; /** * @return Future version. @@ -580,8 +561,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (req != null) { res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion()); - res.addFailedKeys(req.keys(), - new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); + res.addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before " + + "response is received: " + nodeId)); } } @@ -590,22 +571,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } - - /** * @param nodeId Node ID. * @param res Response. * @param nodeErr {@code True} if response was created on node failure. @@ -735,16 +700,40 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (fut0 != null) fut0.onDone(); - IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(remapTopVer); + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(remapKeys, cause); + + onDone(e); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(final IgniteInternalFuture<?> fut) { + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { try { - fut.get(); + AffinityTopologyVersion topVer = fut.get(); - mapOnTopology(); + map(topVer); } catch (IgniteCheckedException e) { onDone(e); @@ -757,8 +746,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } - assert fut0 == null; - if (rcvAll) onDone(opRes0, err0); } @@ -780,11 +767,142 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** + * @param topVer Topology version. + */ + void map(AffinityTopologyVersion topVer) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); + + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); + + return; + } + + Exception err = null; + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; + + int size = keys.size(); + + synchronized (this) { + assert futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + + futVer = cctx.versions().next(topVer); + + if (storeFuture()) + cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); + + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (updVer == null) + updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + + if (updVer != null && log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); + + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; + + singleReq = mapSingleUpdate(); + } + else { + pendingMappings = mapUpdate(topNodes); + + if (pendingMappings.size() == 1) + singleReq = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings = U.newHashMap(pendingMappings.size()); + + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings.put(req.nodeId(), req); + } + } + else + mappings = new HashMap<>(pendingMappings); + + assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; + } + } + + remapKeys = null; + } + catch (Exception e) { + err = e; + } + } + + if (err != null) { + onDone(err); + + return; + } + + // Optimize mapping for single key. + if (singleReq != null) + mapSingle(singleReq.nodeId(), singleReq); + else { + assert pendingMappings != null; + + if (size == 0) + onDone(new GridCacheReturn(cctx, true, null, true)); + else + doUpdate(pendingMappings); + } + } + + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + + return null; + } + + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; + + GridFutureAdapter<Void> fut0; + + synchronized (this) { + fut0 = topCompleteFut; + + topCompleteFut = null; + + ver0 = futVer; + + futVer = null; + } + + if (fut0 != null) + fut0.onDone(); + + return ver0; + } + + /** * @param topNodes Cache nodes. * @return Mapping. * @throws Exception If failed. */ - Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { Iterator<?> it = null; if (vals != null) @@ -906,7 +1024,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @return Request. * @throws Exception If failed. */ - GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { Object key = F.first(keys); Object val; @@ -993,134 +1111,40 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param topVer Topology version. + * @param ret Result from single node. */ - void map(AffinityTopologyVersion topVer) { - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); - - return; - } - - Exception err = null; - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; - - int size = keys.size(); - - synchronized (this) { - assert futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; - - futVer = cctx.versions().next(topVer); - - if (storeFuture()) - cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); - - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (updVer == null) - updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; - - if (updVer != null && log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); - - try { - if (size == 1 && !fastMap) { - assert remapKeys == null || remapKeys.size() == 1; - - singleReq = mapSingleUpdate(); - } - else { - pendingMappings = mapUpdate(topNodes); - - if (pendingMappings.size() == 1) - singleReq = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings = U.newHashMap(pendingMappings.size()); - - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings.put(req.nodeId(), req); - } - } - else - mappings = new HashMap<>(pendingMappings); - - assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; - } - } - - remapKeys = null; - } - catch (Exception e) { - err = e; - } - } - - if (err != null) { - onDone(err); - - return; - } - - // Optimize mapping for single key. - if (singleReq != null) - mapSingle(singleReq.nodeId(), singleReq); - else { - assert pendingMappings != null; + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); - if (size == 0) - onDone(new GridCacheReturn(cctx, true, null, true)); + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); else - doUpdate(pendingMappings); - } - } - - /** - * @param topVer Topology version. - * @return Future. - */ - @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { - if (this.topVer == AffinityTopologyVersion.ZERO) - return null; - - if (this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); - - return topCompleteFut; + opRes = ret; } - - return null; } /** - * @return Future version. + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. */ - GridCacheVersion onFutureDone() { - GridCacheVersion ver0; - - GridFutureAdapter<Void> fut0; - - synchronized (this) { - fut0 = topCompleteFut; - - topCompleteFut = null; + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; - ver0 = futVer; + if (err0 == null) + err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - futVer = null; - } + Collection<Object> keys = new ArrayList<>(failedKeys.size()); - if (fut0 != null) - fut0.onDone(); + for (KeyCacheObject key : failedKeys) + keys.add(key.value(cctx.cacheObjectContext(), false)); - return ver0; + err0.add(keys, err, topVer); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a965d439/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java index 041a0f9..276f89c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -175,8 +175,11 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr for (int i = 0; i < keysCnt; i++) { map.put(i, val); - if (map.size() == 100 || i == keysCnt - 1) + if (map.size() == 100 || i == keysCnt - 1) { cache.putAll(map); + + map.clear(); + } } for (int i = 0; i < keysCnt; i++)
