IGNITE-3007: Slight refactoring in GridNearAtomicUpdateFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c19d38b8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c19d38b8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c19d38b8 Branch: refs/heads/ignite-2926 Commit: c19d38b83a3fe4615f784ed2a22a2886a96b686a Parents: 9791d60 Author: vozerov-gridgain <[email protected]> Authored: Fri Apr 15 12:17:59 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Apr 15 12:17:59 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 1235 +++++++++--------- 2 files changed, 605 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c19d38b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 81109e3..4f8b32c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2896,7 +2896,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (fut != null) - fut.onResult(nodeId, res); + fut.onResult(nodeId, res, false); else U.warn(log, "Failed to find near update future for update response (will ignore) " + "[nodeId=" + nodeId + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/c19d38b8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index c4f48b0..9955df7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -146,8 +146,42 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Remap count. */ private int remapCnt; - /** State. */ - private final UpdateState state; + /** Mutex to synchronize state updates. */ + private final Object mux = new Object(); + + /** Current topology version. */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + private GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + private AffinityTopologyVersion mapErrTopVer; + + /** Mappings if operations is mapped to more than one node. */ + @GridToStringInclude + private Map<UUID, GridNearAtomicUpdateRequest> mappings; + + /** */ + private int resCnt; + + /** Error. */ + private CachePartialUpdateCheckedException err; + + /** Future ID. */ + private GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; + + /** Keys to remap. */ + private Collection<KeyCacheObject> remapKeys; + + /** Not null is operation is mapped to single node. */ + private GridNearAtomicUpdateRequest singleReq; + + /** Operation result. */ + private GridCacheReturn opRes; /** * @param cctx Cache context. @@ -229,8 +263,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> remapCnt = 1; this.remapCnt = remapCnt; - - state = new UpdateState(); } /** {@inheritDoc} */ @@ -240,7 +272,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public GridCacheVersion version() { - return state.futureVersion(); + synchronized (mux) { + return futVer; + } } /** @@ -258,7 +292,33 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - state.onNodeLeft(nodeId); + GridNearAtomicUpdateResponse res = null; + + synchronized (mux) { + GridNearAtomicUpdateRequest req; + + if (singleReq != null) + req = singleReq.nodeId().equals(nodeId) ? singleReq : null; + else + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null && req.response() == null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + nodeId, + req.futureVersion(), + cctx.deploymentEnabled()); + + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + nodeId); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); + } + } + + if (res != null) + onResult(nodeId, res, true); return false; } @@ -287,14 +347,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Cannot remap. remapCnt = 1; - state.map(topVer, null); + map(topVer, null); } } /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { if (waitForPartitionExchange()) { - GridFutureAdapter<Void> fut = state.completeFuture(topVer); + GridFutureAdapter<Void> fut = completeFuture0(topVer); if (fut != null && isDone()) { fut.onDone(); @@ -323,7 +383,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> retval = Collections.emptyMap(); if (super.onDone(retval, err)) { - GridCacheVersion futVer = state.onFutureDone(); + GridCacheVersion futVer = onFutureDone(); if (futVer != null) cctx.mvcc().removeAtomicFuture(futVer); @@ -339,9 +399,208 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * * @param nodeId Node ID. * @param res Update response. + * @param nodeErr {@code True} if response was created on node failure. */ - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { - state.onResult(nodeId, res, false); + @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; + + AffinityTopologyVersion remapTopVer = null; + + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + + boolean rcvAll; + + GridFutureAdapter<?> fut0 = null; + + synchronized (mux) { + if (!res.futureVersion().equals(futVer)) + return; + + if (singleReq != null) { + if (!singleReq.nodeId().equals(nodeId)) + return; + + req = singleReq; + + singleReq = null; + + rcvAll = true; + } + else { + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null && req.onResponse(res)) { + resCnt++; + + rcvAll = mappings.size() == resCnt; + } + else + return; + } + + assert req != null && req.topologyVersion().equals(topVer) : req; + + if (res.remapKeys() != null) { + assert !fastMap || cctx.kernalContext().clientNode(); + + if (remapKeys == null) + remapKeys = U.newHashSet(res.remapKeys().size()); + + remapKeys.addAll(res.remapKeys()); + + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) { + if (res.failedKeys() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + } + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) + addInvokeResults(ret); + } + else + opRes = ret; + } + } + + if (rcvAll) { + if (remapKeys != null) { + assert mapErrTopVer != null; + + remapTopVer = cctx.shared().exchange().topologyVersion(); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + + Collection<Object> failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + + updVer = null; + } + } + } + + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } + } + } + + if (res.error() != null && res.failedKeys() == null) { + onDone(res.error()); + + return; + } + + if (rcvAll && nearEnabled) { + if (mappings != null) { + for (GridNearAtomicUpdateRequest req0 : mappings.values()) { + GridNearAtomicUpdateResponse res0 = req0.response(); + + assert res0 != null : req0; + + updateNear(req0, res0); + } + } + else if (!nodeErr) + updateNear(req, res); + } + + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); + + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert !F.isEmpty(remapKeys) : remapKeys; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(remapKeys, cause); + + onDone(e); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = + cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + AffinityTopologyVersion topVer = fut.get(); + + map(topVer, remapKeys); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); + + return; + } + + if (rcvAll) + onDone(opRes0, err0); } /** @@ -414,7 +673,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.topology().readUnlock(); } - state.map(topVer, null); + map(topVer, null); } /** @@ -457,7 +716,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.updateAllAsyncInternal(nodeId, req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res); + onResult(res.nodeId(), res, false); } }); } @@ -472,7 +731,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> onDone(new GridCacheReturn(cctx, true, true, null, true)); } catch (IgniteCheckedException e) { - state.onSendError(req, e); + onSendError(req, e); } } } @@ -503,7 +762,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } catch (IgniteCheckedException e) { - state.onSendError(req, e); + onSendError(req, e); } } } @@ -512,7 +771,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res); + onResult(res.nodeId(), res, false); } }); } @@ -522,428 +781,144 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * + * @param req Request. + * @param e Error. */ - private class UpdateState { - /** Current topology version. */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - - /** */ - private GridCacheVersion updVer; - - /** Topology version when got mapping error. */ - private AffinityTopologyVersion mapErrTopVer; + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (mux) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion(), + cctx.deploymentEnabled()); - /** Mappings if operations is mapped to more than one node. */ - @GridToStringInclude - private Map<UUID, GridNearAtomicUpdateRequest> mappings; + res.addFailedKeys(req.keys(), e); - /** */ - private int resCnt; - - /** Error. */ - private CachePartialUpdateCheckedException err; - - /** Future ID. */ - private GridCacheVersion futVer; - - /** Completion future for a particular topology version. */ - private GridFutureAdapter<Void> topCompleteFut; - - /** Keys to remap. */ - private Collection<KeyCacheObject> remapKeys; + onResult(req.nodeId(), res, true); + } + } - /** Not null is operation is mapped to single node. */ - private GridNearAtomicUpdateRequest singleReq; + /** + * @param topVer Topology version. + * @param remapKeys Keys to remap. + */ + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - /** Operation result. */ - private GridCacheReturn opRes; + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); - /** - * @return Future version. - */ - @Nullable synchronized GridCacheVersion futureVersion() { - return futVer; + return; } - /** - * @param nodeId Left node ID. - */ - void onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + Exception err = null; + GridNearAtomicUpdateRequest singleReq0 = null; + Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; - synchronized (this) { - GridNearAtomicUpdateRequest req; + int size = keys.size(); - if (singleReq != null) - req = singleReq.nodeId().equals(nodeId) ? singleReq : null; - else - req = mappings != null ? mappings.get(nodeId) : null; + GridCacheVersion futVer = cctx.versions().next(topVer); - if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - nodeId, - req.futureVersion(), - cctx.deploymentEnabled()); + GridCacheVersion updVer; - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + - "before response is received: " + nodeId); + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { + updVer = this.updVer; - e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + if (updVer == null) { + updVer = cctx.versions().next(topVer); - res.addFailedKeys(req.keys(), e); - } + if (log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); } - - if (res != null) - onResult(nodeId, res, true); } + else + updVer = null; - /** - * @param nodeId Node ID. - * @param res Response. - * @param nodeErr {@code True} if response was created on node failure. - */ - @SuppressWarnings("unchecked") - void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { - GridNearAtomicUpdateRequest req; - - AffinityTopologyVersion remapTopVer = null; - - GridCacheReturn opRes0 = null; - CachePartialUpdateCheckedException err0 = null; - - boolean rcvAll; - - GridFutureAdapter<?> fut0 = null; - - synchronized (this) { - if (!res.futureVersion().equals(futVer)) - return; - - if (singleReq != null) { - if (!singleReq.nodeId().equals(nodeId)) - return; - - req = singleReq; - - singleReq = null; - - rcvAll = true; - } - else { - req = mappings != null ? mappings.get(nodeId) : null; - - if (req != null && req.onResponse(res)) { - resCnt++; - - rcvAll = mappings.size() == resCnt; - } - else - return; - } - - assert req != null && req.topologyVersion().equals(topVer) : req; - - if (res.remapKeys() != null) { - assert !fastMap || cctx.kernalContext().clientNode(); - - if (remapKeys == null) - remapKeys = U.newHashSet(res.remapKeys().size()); - - remapKeys.addAll(res.remapKeys()); + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; - if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) - mapErrTopVer = req.topologyVersion(); - } - else if (res.error() != null) { - if (res.failedKeys() != null) - addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); - } + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + } + else { + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, + topVer, + futVer, + updVer, + remapKeys); + + if (pendingMappings.size() == 1) + singleReq0 = F.firstValue(pendingMappings); else { - if (!req.fastMap() || req.hasPrimary()) { - GridCacheReturn ret = res.returnValue(); + if (syncMode == PRIMARY_SYNC) { + mappings0 = U.newHashMap(pendingMappings.size()); - if (op == TRANSFORM) { - if (ret != null) - addInvokeResults(ret); + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings0.put(req.nodeId(), req); } - else - opRes = ret; } - } - - if (rcvAll) { - if (remapKeys != null) { - assert mapErrTopVer != null; - - remapTopVer = cctx.shared().exchange().topologyVersion(); - } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - --remapCnt > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); - - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); - - assert cause != null && cause.topologyVersion() != null : err; - - remapTopVer = - new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); - - err = null; - - Collection<Object> failedKeys = cause.failedKeys(); - - remapKeys = new ArrayList<>(failedKeys.size()); - - for (Object key : failedKeys) - remapKeys.add(cctx.toCacheKeyObject(key)); - - updVer = null; - } - } - } - - if (remapTopVer == null) { - err0 = err; - opRes0 = opRes; - } - else { - fut0 = topCompleteFut; - - topCompleteFut = null; - - cctx.mvcc().removeAtomicFuture(futVer); + else + mappings0 = pendingMappings; - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } + assert !mappings0.isEmpty() || size == 0 : this; } } - if (res.error() != null && res.failedKeys() == null) { - onDone(res.error()); - - return; - } + synchronized (mux) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; - if (rcvAll && nearEnabled) { - if (mappings != null) { - for (GridNearAtomicUpdateRequest req0 : mappings.values()) { - GridNearAtomicUpdateResponse res0 = req0.response(); + this.topVer = topVer; + this.updVer = updVer; + this.futVer = futVer; - assert res0 != null : req0; + resCnt = 0; - updateNear(req0, res0); - } - } - else if (!nodeErr) - updateNear(req, res); - } - - if (remapTopVer != null) { - if (fut0 != null) - fut0.onDone(); - - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); + singleReq = singleReq0; + mappings = mappings0; - return; - } - - if (topLocked) { - assert !F.isEmpty(remapKeys) : remapKeys; - - CachePartialUpdateCheckedException e = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - - ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( - "Failed to update keys, topology changed while execute atomic update inside transaction."); - - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); - - e.add(remapKeys, cause); - - onDone(e); - - return; - } - - IgniteInternalFuture<AffinityTopologyVersion> fut = - cctx.shared().exchange().affinityReadyFuture(remapTopVer); - - if (fut == null) - fut = new GridFinishedFuture<>(remapTopVer); - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - try { - AffinityTopologyVersion topVer = fut.get(); - - map(topVer, remapKeys); - } - catch (IgniteCheckedException e) { - onDone(e); - } - } - }); - } - }); - - return; + this.remapKeys = null; } - - if (rcvAll) - onDone(opRes0, err0); + } + catch (Exception e) { + err = e; } - /** - * @param req Request. - * @param e Error. - */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { - synchronized (this) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion(), - cctx.deploymentEnabled()); - - res.addFailedKeys(req.keys(), e); + if (err != null) { + onDone(err); - onResult(req.nodeId(), res, true); - } + return; } - /** - * @param topVer Topology version. - * @param remapKeys Keys to remap. - */ - void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, this)) { + assert isDone() : this; return; } + } - Exception err = null; - GridNearAtomicUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; - - int size = keys.size(); - - GridCacheVersion futVer = cctx.versions().next(topVer); - - GridCacheVersion updVer; - - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { - updVer = this.updVer; - - if (updVer == null) { - updVer = cctx.versions().next(topVer); + // Optimize mapping for single key. + if (singleReq0 != null) + mapSingle(singleReq0.nodeId(), singleReq0); + else { + assert mappings0 != null; - if (log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); - } - } + if (size == 0) + onDone(new GridCacheReturn(cctx, true, true, null, true)); else - updVer = null; - - try { - if (size == 1 && !fastMap) { - assert remapKeys == null || remapKeys.size() == 1; - - singleReq0 = mapSingleUpdate(topVer, futVer, updVer); - } - else { - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, - topVer, - futVer, - updVer, - remapKeys); - - if (pendingMappings.size() == 1) - singleReq0 = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings0 = U.newHashMap(pendingMappings.size()); - - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings0.put(req.nodeId(), req); - } - } - else - mappings0 = pendingMappings; - - assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; - } - } - - synchronized (this) { - assert this.futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; - this.updVer = updVer; - this.futVer = futVer; - - resCnt = 0; - - singleReq = singleReq0; - mappings = mappings0; - - this.remapKeys = null; - } - } - catch (Exception e) { - err = e; - } - - if (err != null) { - onDone(err); - - return; - } - - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { - assert isDone() : GridNearAtomicUpdateFuture.this; - - return; - } - } - - // Optimize mapping for single key. - if (singleReq0 != null) - mapSingle(singleReq0.nodeId(), singleReq0); - else { - assert mappings0 != null; - - if (size == 0) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - else - doUpdate(mappings0); - } + doUpdate(mappings0); } + } - /** - * @param topVer Topology version. - * @return Future. - */ - @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) { + synchronized (mux) { if (this.topVer == AffinityTopologyVersion.ZERO) return null; @@ -956,176 +931,67 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return null; } + } - /** - * @return Future version. - */ - GridCacheVersion onFutureDone() { - GridCacheVersion ver0; - - GridFutureAdapter<Void> fut0; - - synchronized (this) { - fut0 = topCompleteFut; + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; - topCompleteFut = null; + GridFutureAdapter<Void> fut0; - ver0 = futVer; + synchronized (mux) { + fut0 = topCompleteFut; - futVer = null; - } + topCompleteFut = null; - if (fut0 != null) - fut0.onDone(); + ver0 = futVer; - return ver0; + futVer = null; } - /** - * @param topNodes Cache nodes. - * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. - * @param remapKeys Keys to remap. - * @return Mapping. - * @throws Exception If failed. - */ - private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, - AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer, - @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { - Iterator<?> it = null; - - if (vals != null) - it = vals.iterator(); - - Iterator<GridCacheDrInfo> conflictPutValsIt = null; - - if (conflictPutVals != null) - conflictPutValsIt = conflictPutVals.iterator(); - - Iterator<GridCacheVersion> conflictRmvValsIt = null; - - if (conflictRmvVals != null) - conflictRmvValsIt = conflictRmvVals.iterator(); - - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); - - // Create mappings first, then send messages. - for (Object key : keys) { - if (key == null) - throw new NullPointerException("Null key."); - - Object val; - GridCacheVersion conflictVer; - long conflictTtl; - long conflictExpireTime; - - if (vals != null) { - val = it.next(); - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - - if (val == null) - throw new NullPointerException("Null value."); - } - else if (conflictPutVals != null) { - GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); + if (fut0 != null) + fut0.onDone(); - val = conflictPutVal.valueEx(); - conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); - conflictExpireTime = conflictPutVal.expireTime(); - } - else if (conflictRmvVals != null) { - val = null; - conflictVer = conflictRmvValsIt.next(); - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else { - val = null; - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - - if (val == null && op != GridCacheOperation.DELETE) - continue; - - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - - if (remapKeys != null && !remapKeys.contains(cacheKey)) - continue; + return ver0; + } - if (op != TRANSFORM) - val = cctx.toCacheObject(val); + /** + * @param topNodes Cache nodes. + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @param remapKeys Keys to remap. + * @return Mapping. + * @throws Exception If failed. + */ + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, + AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer, + @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { + Iterator<?> it = null; - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + if (vals != null) + it = vals.iterator(); - if (affNodes.isEmpty()) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); + Iterator<GridCacheDrInfo> conflictPutValsIt = null; - int i = 0; - - for (ClusterNode affNode : affNodes) { - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); - - UUID nodeId = affNode.id(); - - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - - if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - keys.size()); - - pendingMappings.put(nodeId, mapped); - } + if (conflictPutVals != null) + conflictPutValsIt = conflictPutVals.iterator(); - mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + Iterator<GridCacheVersion> conflictRmvValsIt = null; - i++; - } - } + if (conflictRmvVals != null) + conflictRmvValsIt = conflictRmvVals.iterator(); - return pendingMappings; - } + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); - /** - * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. - * @return Request. - * @throws Exception If failed. - */ - private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer) throws Exception { - Object key = F.first(keys); + // Create mappings first, then send messages. + for (Object key : keys) { + if (key == null) + throw new NullPointerException("Null key."); Object val; GridCacheVersion conflictVer; @@ -1133,131 +999,238 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> long conflictExpireTime; if (vals != null) { - // Regular PUT. - val = F.first(vals); + val = it.next(); conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + + if (val == null) + throw new NullPointerException("Null value."); } else if (conflictPutVals != null) { - // Conflict PUT. - GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); + conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); } else if (conflictRmvVals != null) { - // Conflict REMOVE. val = null; - conflictVer = F.first(conflictRmvVals); + conflictVer = conflictRmvValsIt.next(); conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } else { - // Regular REMOVE. val = null; conflictVer = null; conflictTtl = CU.TTL_NOT_CHANGED; conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } - // We still can get here if user pass map with single element. - if (key == null) - throw new NullPointerException("Null key."); - if (val == null && op != GridCacheOperation.DELETE) - throw new NullPointerException("Null value."); + continue; KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; + if (op != TRANSFORM) val = cctx.toCacheObject(val); - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); - - if (primary == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid)."); - - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - keepBinary, - cctx.kernalContext().clientNode(), - cctx.deploymentEnabled(), - 1); - - req.addUpdateEntry(cacheKey, - val, - conflictTtl, - conflictExpireTime, - conflictVer, - true); - - return req; - } + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); - /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + int i = 0; + + for (ClusterNode affNode : affNodes) { + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. - */ - private void addFailedKeys(Collection<KeyCacheObject> failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; + UUID nodeId = affNode.id(); - if (err0 == null) - err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - Collection<Object> keys = new ArrayList<>(failedKeys.size()); + if (mapped == null) { + mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + keys.size()); + + pendingMappings.put(nodeId, mapped); + } + + mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + + i++; + } + } - for (KeyCacheObject key : failedKeys) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + return pendingMappings; + } - err0.add(keys, err, topVer); + /** + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @return Request. + * @throws Exception If failed. + */ + private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer) throws Exception { + Object key = F.first(keys); + + Object val; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; + + if (vals != null) { + // Regular PUT. + val = F.first(vals); + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; } + else if (conflictPutVals != null) { + // Conflict PUT. + GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); + + val = conflictPutVal.valueEx(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); + } + else if (conflictRmvVals != null) { + // Conflict REMOVE. + val = null; + conflictVer = F.first(conflictRmvVals); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else { + // Regular REMOVE. + val = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + + // We still can get here if user pass map with single element. + if (key == null) + throw new NullPointerException("Null key."); + + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); + + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + 1); + + req.addUpdateEntry(cacheKey, + val, + conflictTtl, + conflictExpireTime, + conflictVer, + true); + + return req; + } - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(UpdateState.class, this); + /** + * @param ret Result from single node. + */ + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; } } + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; + + if (err0 == null) + err0 = this.err = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(failedKeys.size()); + + for (KeyCacheObject key : failedKeys) + keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + + err0.add(keys, err, topVer); + } + /** {@inheritDoc} */ public String toString() { - return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); + synchronized (mux) { + return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); + } } }
