# ignite-1124
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4109bf4c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4109bf4c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4109bf4c Branch: refs/heads/ignite-1124 Commit: 4109bf4c47a3c727666ec5b49bd90d63b8e38846 Parents: ca03efd Author: sboikov <[email protected]> Authored: Wed Aug 26 12:25:37 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 26 12:25:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMvccManager.java | 62 +- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 1550 +++++++++--------- .../util/future/GridCompoundFuture.java | 3 +- .../apache/ignite/internal/util/typedef/X.java | 1 + .../IgniteCachePutRetryAbstractSelfTest.java | 271 ++- .../dht/IgniteCachePutRetryAtomicSelfTest.java | 77 +- ...gniteCachePutRetryTransactionalSelfTest.java | 14 +- .../tcp/IgniteCacheSslStartStopSelfTest.java | 9 +- 9 files changed, 1095 insertions(+), 898 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 6a8c6fe..bbac42b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -196,8 +196,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { cacheFut.onNodeLeft(discoEvt.eventNode().id()); - if (cacheFut.isCancelled() || cacheFut.isDone()) - atomicFuts.remove(cacheFut.futureId(), fut); + if (cacheFut.isCancelled() || cacheFut.isDone()) { + GridCacheVersion futVer = cacheFut.version(); + + if (futVer != null) + atomicFuts.remove(futVer, fut); + } } } } @@ -347,16 +351,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param fut Future to check. - * @return {@code True} if future is registered. - */ - public boolean hasFuture(GridCacheFuture<?> fut) { - assert fut != null; - - return future(fut.version(), fut.futureId()) != null; - } - - /** * @param futVer Future ID. * @param fut Future. */ @@ -565,6 +559,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param ver Version. * @return All futures for given lock version. */ + @SuppressWarnings("unchecked") public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) { Collection c = futs.get(ver); @@ -572,6 +567,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheCtx Cache context. * @param ver Lock version to check. * @return {@code True} if lock had been removed. */ @@ -580,6 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheCtx Cache context. * @param ver Obsolete entry version. * @return {@code True} if added. */ @@ -688,27 +685,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param keys Keys. - * @param base Base version. - * @return Versions that are less than {@code base} whose keys are in the {@code keys} collection. - */ - public Collection<GridCacheVersion> localDhtPendingVersions(Collection<KeyCacheObject> keys, GridCacheVersion base) { - Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5); - - for (GridCacheMvccCandidate cand : dhtLocCands) { - if (cand.version().isLess(base)) { - if (keys.contains(cand.key())) - lessPending.add(cand.version()); - } - else - break; - } - - return lessPending; - } - - /** - * + * @param cacheCtx Cache context. * @param cand Cache lock candidate to add. * @return {@code True} if added as a result of this operation, * {@code false} if was previously added. @@ -924,24 +901,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> finishFutsSize: " + finishFuts.size()); } - - /** - * @param nodeId Node ID. - * @return Filter. - */ - private IgnitePredicate<GridCacheMvccCandidate> nodeIdFilter(final UUID nodeId) { - if (nodeId == null) - return F.alwaysTrue(); - - return new P1<GridCacheMvccCandidate>() { - @Override public boolean apply(GridCacheMvccCandidate c) { - UUID otherId = c.otherNodeId(); - - return c.nodeId().equals(nodeId) || (otherId != null && otherId.equals(nodeId)); - } - }; - } - /** * @param topVer Topology version. * @return Future that signals when all locks for given partitions are released. @@ -994,6 +953,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * * @return Finish update future. */ + @SuppressWarnings("unchecked") public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) { GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c44b028..073737d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -135,6 +135,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { + assert req.writeSynchronizationMode() != FULL_ASYNC : req; + // Always send reply in CLOCK ordering mode. sendNearUpdateReply(res.nodeId(), res); @@ -2243,6 +2245,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param req Request to remap. */ private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { + assert req.writeSynchronizationMode() == FULL_ASYNC : req; + if (log.isDebugEnabled()) log.debug("Remapping near update request locally: " + req); @@ -2275,7 +2279,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { drRmvVals = null; } else { - assert req.operation() == DELETE; + assert req.operation() == DELETE : req; drRmvVals = req.conflictVersions(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 8a2f073..d0c8766 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -35,11 +35,9 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; -import org.jsr166.*; import javax.cache.expiry.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; @@ -124,1011 +122,1011 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> private final boolean waitTopFut; /** Remap count. */ - private AtomicInteger remapCnt; + private int remapCnt; /** State. */ private final UpdateState state; /** - * + * @param cctx Cache context. + * @param cache Cache instance. + * @param syncMode Write synchronization mode. + * @param op Update operation. + * @param keys Keys to update. + * @param vals Values or transform closure. + * @param invokeArgs Optional arguments for entry processor. + * @param conflictPutVals Conflict put values (optional). + * @param conflictRmvVals Conflict remove values (optional). + * @param retval Return value require flag. + * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. + * @param expiryPlc Expiry policy explicitly specified for cache operation. + * @param filter Entry filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip store flag. + * @param remapCnt Maximum number of retries. + * @param waitTopFut If {@code false} does not wait for affinity change future. */ - private class UpdateState { - /** */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + public GridNearAtomicUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + Collection<?> keys, + @Nullable Collection<?> vals, + @Nullable Object[] invokeArgs, + @Nullable Collection<GridCacheDrInfo> conflictPutVals, + @Nullable Collection<GridCacheVersion> conflictRmvVals, + final boolean retval, + final boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + final CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + int remapCnt, + boolean waitTopFut + ) { + this.rawRetval = rawRetval; - /** */ - private GridCacheVersion updVer; + assert vals == null || vals.size() == keys.size(); + assert conflictPutVals == null || conflictPutVals.size() == keys.size(); + assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); + assert subjId != null; - /** */ - private AffinityTopologyVersion remapErrTopVer; + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.keys = keys; + this.vals = vals; + this.invokeArgs = invokeArgs; + this.conflictPutVals = conflictPutVals; + this.conflictRmvVals = conflictRmvVals; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.waitTopFut = waitTopFut; - /** Mappings. */ - @GridToStringInclude - private Map<UUID, GridNearAtomicUpdateRequest> mappings; + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); - /** Error. */ - private CachePartialUpdateCheckedException err; + fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && + cctx.config().getAtomicWriteOrderMode() == CLOCK && + !(cctx.writeThrough() && cctx.config().getInterceptor() != null); - /** Future ID. */ - private GridCacheVersion futVer; + nearEnabled = CU.isNearEnabled(cctx); - /** Completion future for a particular topology version. */ - private GridFutureAdapter<Void> topCompleteFut; + if (!waitTopFut) + remapCnt = 1; - /** */ - private Collection<KeyCacheObject> remapKeys; + this.remapCnt = remapCnt; - /** */ - private GridNearAtomicUpdateRequest singleReq; + state = new UpdateState(); + } - /** - * @param failedKeys Failed keys. - * @param topVer Topology version for failed update. - * @param err Error cause. - */ - private void addFailedKeys(Collection<KeyCacheObject> failedKeys, - AffinityTopologyVersion topVer, - Throwable err) { - CachePartialUpdateCheckedException err0 = this.err; + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } - if (err0 == null) - err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return state.futureVersion(); + } - Collection<Object> keys = new ArrayList<>(failedKeys.size()); + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + throw new UnsupportedOperationException(); + } - for (KeyCacheObject key : failedKeys) - keys.add(key.value(cctx.cacheObjectContext(), false)); + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { + // Wait fast-map near atomic update futures in CLOCK mode. + return fastMap; + } - err0.add(keys, err, topVer); - } + /** {@inheritDoc} */ + @Override public Collection<?> keys() { + return keys; + } - synchronized IgniteUuid futureId() { - return futVer.asGridUuid(); - } + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + state.onNodeLeft(nodeId); - synchronized GridCacheVersion futureVersion() { - return futVer; - } + return false; + } - /** - * @param nodeId Left node ID. - */ - void onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } - synchronized (this) { - GridNearAtomicUpdateRequest req; + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } - if (singleReq != null) - req = singleReq.nodeId().equals(nodeId) ? singleReq : null; - else - req = mappings != null ? mappings.get(nodeId) : null; + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = null; - if (req != null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion()); + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); - res.addFailedKeys(req.keys(), - new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); - } - } + if (tx != null && tx.topologyVersionSnapshot() != null) + topVer = tx.topologyVersionSnapshot(); - if (res != null) - onResult(nodeId, res, true); - } + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); - /** - * @param ret Result from single node. - */ - @SuppressWarnings("unchecked") - private void addInvokeResults(GridCacheReturn ret) { - assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } - } + // Cannot remap. + remapCnt = 1; - /** - * @param nodeId Node ID. - * @param res Response. - */ - void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { - GridNearAtomicUpdateRequest req; + state.map(topVer); + } + } - AffinityTopologyVersion errTopVer = null; + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange()) { + GridFutureAdapter<Void> fut = state.completeFuture(topVer); - GridCacheReturn opRes0 = null; - CachePartialUpdateCheckedException err0 = null; + if (fut != null && isDone()) { + fut.onDone(); - boolean rcvAll; + return null; + } - GridFutureAdapter<?> fut0 = null; + return fut; + } - synchronized (this) { - if (!res.futureVersion().equals(futVer)) - return; + return null; + } - if (singleReq != null) { - if (!singleReq.nodeId().equals(nodeId)) - return; + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert res == null || res instanceof GridCacheReturn; - req = singleReq; + GridCacheReturn ret = (GridCacheReturn)res; - singleReq = null; + Object retval = + res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success(); - rcvAll = true; - } - else { - req = mappings != null ? mappings.remove(nodeId) : null; + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); - if (req != null) - rcvAll = mappings.isEmpty(); - else - return; - } + if (super.onDone(retval, err)) { + GridCacheVersion futVer = state.onFutureDone(); - assert req != null && req.topologyVersion().equals(topVer) : req; + if (futVer != null) + cctx.mvcc().removeAtomicFuture(futVer); - if (res.remapKeys() != null) { - assert !fastMap || cctx.kernalContext().clientNode(); + return true; + } - if (remapKeys == null) - remapKeys = U.newHashSet(res.remapKeys().size()); + return false; + } - remapKeys.addAll(res.remapKeys()); - - if (remapErrTopVer == null || remapErrTopVer.compareTo(req.topologyVersion()) < 0) - remapErrTopVer = req.topologyVersion(); - } - else if (res.error() != null) { - addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); - } - else { - if (!req.fastMap() || req.hasPrimary()) { - GridCacheReturn ret = res.returnValue(); - - if (op == TRANSFORM) { - if (ret != null) - addInvokeResults(ret); - } - else - opRes = ret; - } - } - - if (rcvAll) { - if (remapKeys != null) { - assert remapErrTopVer != null; - - errTopVer = remapErrTopVer; - } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - remapCnt.decrementAndGet() > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); - - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); - - assert cause != null && cause.topologyVersion() != null : err; - - errTopVer = cause.topologyVersion(); - - err = null; - - Collection<Object> failedKeys = cause.failedKeys(); + /** + * Response callback. + * + * @param nodeId Node ID. + * @param res Update response. + */ + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + state.onResult(nodeId, res, false); + } - remapKeys = new ArrayList<>(failedKeys.size()); + /** + * Updates near cache. + * + * @param req Update request. + * @param res Update response. + */ + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + if (!nearEnabled || !req.hasPrimary()) + return; - for (Object key : failedKeys) - remapKeys.add(cctx.toCacheKeyObject(key)); + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); - updVer = null; - } - } - } + near.processNearAtomicUpdateResponse(req, res); + } - if (errTopVer == null) { - err0 = err; - opRes0 = opRes; - } - else { - fut0 = topCompleteFut; + /** + * Maps future on ready topology. + */ + private void mapOnTopology() { + cache.topology().readLock(); - topCompleteFut = null; + AffinityTopologyVersion topVer = null; - cctx.mvcc().removeAtomicFuture(futVer); + try { + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } - } + return; } - if (!nodeErr && res.remapKeys() == null) - updateNear(req, res); + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - if (errTopVer != null) { - if (fut0 != null) - fut0.onDone(); + if (fut.isDone()) { + if (!fut.isCacheTopologyValid(cctx)) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + cctx.name())); - if (errTopVer == AffinityTopologyVersion.NONE) - mapOnTopology(); - else { - IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1); + return; + } - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(final IgniteInternalFuture<?> fut) { + topVer = fut.topologyVersion(); + } + else { + if (waitTopFut) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - try { - fut.get(); - - mapOnTopology(); - } - catch (IgniteCheckedException e) { - onDone(e); - } + mapOnTopology(); } }); } }); } + else + onDone(new GridCacheTryPutFailedException()); return; } - - if (rcvAll) - onDone(opRes0, err0); } - - /** - * @param req Request. - * @param e Error. - */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { - synchronized (this) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion()); - - res.addFailedKeys(req.keys(), e); - - onResult(req.nodeId(), res, true); - } + finally { + cache.topology().readUnlock(); } - /** - * @param topNodes Cache nodes. - * @return Mapping. - * @throws Exception If failed. - */ - Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { - Iterator<?> it = null; + state.map(topVer); + } - if (vals != null) - it = vals.iterator(); + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + private boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } - Iterator<GridCacheDrInfo> conflictPutValsIt = null; + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. + * @return Collection of nodes to which key is mapped. + */ + private Collection<ClusterNode> mapKey( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean fastMap + ) { + GridCacheAffinityManager affMgr = cctx.affinity(); - if (conflictPutVals != null) - conflictPutValsIt = conflictPutVals.iterator(); + // If we can send updates in parallel - do it. + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } - Iterator<GridCacheVersion> conflictRmvValsIt = null; + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - if (conflictRmvVals != null) - conflictRmvValsIt = conflictRmvVals.iterator(); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } + } - // Create mappings first, then send messages. - for (Object key : keys) { - if (key == null) - throw new NullPointerException("Null key."); + /** + * Sends messages to remote nodes and updates local cache. + * + * @param mappings Mappings to send. + */ + private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { + UUID locNodeId = cctx.localNodeId(); - Object val; - GridCacheVersion conflictVer; - long conflictTtl; - long conflictExpireTime; + GridNearAtomicUpdateRequest locUpdate = null; - if (vals != null) { - val = it.next(); - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + // Send messages to remote nodes first, then run local update. + for (GridNearAtomicUpdateRequest req : mappings.values()) { + if (locNodeId.equals(req.nodeId())) { + assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + + ", req=" + req + ']'; - if (val == null) - throw new NullPointerException("Null value."); - } - else if (conflictPutVals != null) { - GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); + locUpdate = req; + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - val = conflictPutVal.value(); - conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); - conflictExpireTime = conflictPutVal.expireTime(); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } - else if (conflictRmvVals != null) { - val = null; - conflictVer = conflictRmvValsIt.next(); - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else { - val = null; - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + catch (IgniteCheckedException e) { + state.onSendError(req, e); } + } + } - if (val == null && op != GridCacheOperation.DELETE) - continue; + if (locUpdate != null) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } - if (remapKeys != null && !remapKeys.contains(cacheKey)) - continue; + /** + * + */ + private class UpdateState { + /** */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; - if (op != TRANSFORM) - val = cctx.toCacheObject(val); + /** */ + private GridCacheVersion updVer; - Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + /** */ + private AffinityTopologyVersion mapErrTopVer; - if (affNodes.isEmpty()) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); + /** Mappings. */ + @GridToStringInclude + private Map<UUID, GridNearAtomicUpdateRequest> mappings; - int i = 0; + /** Error. */ + private CachePartialUpdateCheckedException err; - for (ClusterNode affNode : affNodes) { - if (affNode == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid)."); + /** Future ID. */ + private GridCacheVersion futVer; - UUID nodeId = affNode.id(); + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; - GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); + /** */ + private Collection<KeyCacheObject> remapKeys; - if (mapped == null) { - mapped = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - cctx.kernalContext().clientNode()); + /** */ + private GridNearAtomicUpdateRequest singleReq; - pendingMappings.put(nodeId, mapped); - } + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; - mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); + if (err0 == null) + err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - i++; - } - } + Collection<Object> keys = new ArrayList<>(failedKeys.size()); - return pendingMappings; + for (KeyCacheObject key : failedKeys) + keys.add(key.value(cctx.cacheObjectContext(), false)); + + err0.add(keys, err, topVer); } /** - * @return Request. - * @throws Exception If failed. + * @return Future version. */ - GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { - Object key = F.first(keys); - - Object val; - GridCacheVersion conflictVer; - long conflictTtl; - long conflictExpireTime; - - if (vals != null) { - // Regular PUT. - val = F.first(vals); - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else if (conflictPutVals != null) { - // Conflict PUT. - GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - - val = conflictPutVal.value(); - conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); - conflictExpireTime = conflictPutVal.expireTime(); - } - else if (conflictRmvVals != null) { - // Conflict REMOVE. - val = null; - conflictVer = F.first(conflictRmvVals); - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - else { - // Regular REMOVE. - val = null; - conflictVer = null; - conflictTtl = CU.TTL_NOT_CHANGED; - conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - } - - // We still can get here if user pass map with single element. - if (key == null) - throw new NullPointerException("Null key."); + @Nullable synchronized GridCacheVersion futureVersion() { + return futVer; + } - if (val == null && op != GridCacheOperation.DELETE) - throw new NullPointerException("Null value."); + /** + * @param nodeId Left node ID. + */ + void onNodeLeft(UUID nodeId) { + GridNearAtomicUpdateResponse res = null; - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + synchronized (this) { + GridNearAtomicUpdateRequest req; - if (op != TRANSFORM) - val = cctx.toCacheObject(val); + if (singleReq != null) + req = singleReq.nodeId().equals(nodeId) ? singleReq : null; + else + req = mappings != null ? mappings.get(nodeId) : null; - ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + if (req != null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion()); - if (primary == null) - throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid)."); + res.addFailedKeys(req.keys(), + new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); + } + } - GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( - cctx.cacheId(), - primary.id(), - futVer, - fastMap, - updVer, - topVer, - topLocked, - syncMode, - op, - retval, - expiryPlc, - invokeArgs, - filter, - subjId, - taskNameHash, - skipStore, - cctx.kernalContext().clientNode()); + if (res != null) + onResult(nodeId, res, true); + } - req.addUpdateEntry(cacheKey, - val, - conflictTtl, - conflictExpireTime, - conflictVer, - true); + /** + * @param ret Result from single node. + */ + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); - return req; + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } } /** - * @param topVer Topology version. + * @param nodeId Node ID. + * @param res Response. + * @param nodeErr {@code True} if response was created on node failure. */ - void map(AffinityTopologyVersion topVer) { - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); + void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); + AffinityTopologyVersion remapTopVer = null; - return; - } + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; - Exception err = null; - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; + boolean rcvAll; - int size = keys.size(); + GridFutureAdapter<?> fut0 = null; synchronized (this) { - assert futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; + if (!res.futureVersion().equals(futVer)) + return; - futVer = cctx.versions().next(topVer); + if (singleReq != null) { + if (!singleReq.nodeId().equals(nodeId)) + return; - if (storeFuture()) - cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); + req = singleReq; - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (updVer == null) - updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + singleReq = null; - if (updVer != null && log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); + rcvAll = true; + } + else { + req = mappings != null ? mappings.remove(nodeId) : null; - try { - if (size == 1 && !fastMap) { - assert remapKeys == null || remapKeys.size() == 1; + if (req != null) + rcvAll = mappings.isEmpty(); + else + return; + } - singleReq = mapSingleUpdate(); + assert req != null && req.topologyVersion().equals(topVer) : req; + + if (res.remapKeys() != null) { + assert !fastMap || cctx.kernalContext().clientNode(); + + if (remapKeys == null) + remapKeys = U.newHashSet(res.remapKeys().size()); + + remapKeys.addAll(res.remapKeys()); + + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) + addInvokeResults(ret); + } + else + opRes = ret; + } + } + + if (rcvAll) { + if (remapKeys != null) { + assert mapErrTopVer != null; + + remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); } else { - pendingMappings = mapUpdate(topNodes); + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); - if (pendingMappings.size() == 1) - singleReq = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings = U.newHashMap(pendingMappings.size()); + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings.put(req.nodeId(), req); - } - } - else - mappings = new HashMap<>(pendingMappings); + assert cause != null && cause.topologyVersion() != null : err; - assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + + Collection<Object> failedKeys = cause.failedKeys(); + + remapKeys = new ArrayList<>(failedKeys.size()); + + for (Object key : failedKeys) + remapKeys.add(cctx.toCacheKeyObject(key)); + + updVer = null; + } } } - remapKeys = null; - } - catch (Exception e) { - err = e; + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } } } - if (err != null) { - onDone(err); + if (!nodeErr && res.remapKeys() == null) + updateNear(req, res); + + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); + + IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(final IgniteInternalFuture<?> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + fut.get(); + + mapOnTopology(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); return; } - // Optimize mapping for single key. - if (singleReq != null) - mapSingle(singleReq.nodeId(), singleReq); - else { - assert pendingMappings != null; + assert fut0 == null; - if (size == 0) - onDone(new GridCacheReturn(cctx, true, null, true)); - else - doUpdate(pendingMappings); - } + if (rcvAll) + onDone(opRes0, err0); } /** - * @param topVer Topology version. - * @return Future. + * @param req Request. + * @param e Error. */ - @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { - if (this.topVer == AffinityTopologyVersion.ZERO) - return null; + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (this) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion()); - if (this.topVer.compareTo(topVer) < 0) { - if (topCompleteFut == null) - topCompleteFut = new GridFutureAdapter<>(); + res.addFailedKeys(req.keys(), e); - return topCompleteFut; + onResult(req.nodeId(), res, true); } - - return null; } /** - * @return Future version. + * @param topNodes Cache nodes. + * @return Mapping. + * @throws Exception If failed. */ - GridCacheVersion onFutureDone() { - GridCacheVersion ver0; - - GridFutureAdapter<Void> fut0; + Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { + Iterator<?> it = null; - synchronized (this) { - fut0 = topCompleteFut; + if (vals != null) + it = vals.iterator(); - topCompleteFut = null; + Iterator<GridCacheDrInfo> conflictPutValsIt = null; - ver0 = futVer; + if (conflictPutVals != null) + conflictPutValsIt = conflictPutVals.iterator(); - futVer = null; - } + Iterator<GridCacheVersion> conflictRmvValsIt = null; - if (fut0 != null) - fut0.onDone(); + if (conflictRmvVals != null) + conflictRmvValsIt = conflictRmvVals.iterator(); - return ver0; - } + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return S.toString(UpdateState.class, this); - } - } + // Create mappings first, then send messages. + for (Object key : keys) { + if (key == null) + throw new NullPointerException("Null key."); - /** - * @param cctx Cache context. - * @param cache Cache instance. - * @param syncMode Write synchronization mode. - * @param op Update operation. - * @param keys Keys to update. - * @param vals Values or transform closure. - * @param invokeArgs Optional arguments for entry processor. - * @param conflictPutVals Conflict put values (optional). - * @param conflictRmvVals Conflict remove values (optional). - * @param retval Return value require flag. - * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. - * @param expiryPlc Expiry policy explicitly specified for cache operation. - * @param filter Entry filter. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param skipStore Skip store flag. - */ - public GridNearAtomicUpdateFuture( - GridCacheContext cctx, - GridDhtAtomicCache cache, - CacheWriteSynchronizationMode syncMode, - GridCacheOperation op, - Collection<?> keys, - @Nullable Collection<?> vals, - @Nullable Object[] invokeArgs, - @Nullable Collection<GridCacheDrInfo> conflictPutVals, - @Nullable Collection<GridCacheVersion> conflictRmvVals, - final boolean retval, - final boolean rawRetval, - @Nullable ExpiryPolicy expiryPlc, - final CacheEntryPredicate[] filter, - UUID subjId, - int taskNameHash, - boolean skipStore, - int remapCnt, - boolean waitTopFut - ) { - this.rawRetval = rawRetval; + Object val; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; - assert vals == null || vals.size() == keys.size(); - assert conflictPutVals == null || conflictPutVals.size() == keys.size(); - assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); - assert subjId != null; + if (vals != null) { + val = it.next(); + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; - this.cctx = cctx; - this.cache = cache; - this.syncMode = syncMode; - this.op = op; - this.keys = keys; - this.vals = vals; - this.invokeArgs = invokeArgs; - this.conflictPutVals = conflictPutVals; - this.conflictRmvVals = conflictRmvVals; - this.retval = retval; - this.expiryPlc = expiryPlc; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.waitTopFut = waitTopFut; + if (val == null) + throw new NullPointerException("Null value."); + } + else if (conflictPutVals != null) { + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); - if (log == null) - log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + val = conflictPutVal.value(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); + } + else if (conflictRmvVals != null) { + val = null; + conflictVer = conflictRmvValsIt.next(); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else { + val = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } - fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && - cctx.config().getAtomicWriteOrderMode() == CLOCK && - !(cctx.writeThrough() && cctx.config().getInterceptor() != null); + if (val == null && op != GridCacheOperation.DELETE) + continue; - nearEnabled = CU.isNearEnabled(cctx); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - if (!waitTopFut) - remapCnt = 1; + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; - this.remapCnt = new AtomicInteger(remapCnt); + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - state = new UpdateState(); - } + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); - /** {@inheritDoc} */ - @Override public IgniteUuid futureId() { - return state.futureId(); - } + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); - /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return state.futureVersion(); - } + int i = 0; - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - throw new UnsupportedOperationException(); - } + for (ClusterNode affNode : affNodes) { + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); - /** - * @return {@code True} if this future should block partition map exchange. - */ - private boolean waitForPartitionExchange() { - // Wait fast-map near atomic update futures in CLOCK mode. - return fastMap; - } + UUID nodeId = affNode.id(); - /** {@inheritDoc} */ - @Override public Collection<?> keys() { - return keys; - } + GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); - /** {@inheritDoc} */ - @Override public boolean onNodeLeft(UUID nodeId) { - state.onNodeLeft(nodeId); + if (mapped == null) { + mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + cctx.kernalContext().clientNode()); - return false; - } + pendingMappings.put(nodeId, mapped); + } - /** {@inheritDoc} */ - @Override public boolean trackable() { - return true; - } + mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); - /** {@inheritDoc} */ - @Override public void markNotTrackable() { - // No-op. - } + i++; + } + } - /** - * Performs future mapping. - */ - public void map() { - AffinityTopologyVersion topVer = null; + return pendingMappings; + } - IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); + /** + * @return Request. + * @throws Exception If failed. + */ + GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + Object key = F.first(keys); - if (tx != null && tx.topologyVersionSnapshot() != null) - topVer = tx.topologyVersionSnapshot(); + Object val; + GridCacheVersion conflictVer; + long conflictTtl; + long conflictExpireTime; - if (topVer == null) - topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + if (vals != null) { + // Regular PUT. + val = F.first(vals); + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else if (conflictPutVals != null) { + // Conflict PUT. + GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); - if (topVer == null) - mapOnTopology(); - else { - topLocked = true; + val = conflictPutVal.value(); + conflictVer = conflictPutVal.version(); + conflictTtl = conflictPutVal.ttl(); + conflictExpireTime = conflictPutVal.expireTime(); + } + else if (conflictRmvVals != null) { + // Conflict REMOVE. + val = null; + conflictVer = F.first(conflictRmvVals); + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } + else { + // Regular REMOVE. + val = null; + conflictVer = null; + conflictTtl = CU.TTL_NOT_CHANGED; + conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; + } - // Cannot remap. - remapCnt.set(1); + // We still can get here if user pass map with single element. + if (key == null) + throw new NullPointerException("Null key."); - state.map(topVer); - } - } + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - if (waitForPartitionExchange()) { - GridFutureAdapter<Void> fut = state.completeFuture(topVer); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - if (fut != null && isDone()) { - fut.onDone(); + if (op != TRANSFORM) + val = cctx.toCacheObject(val); - return null; - } + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); - return fut; - } + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); - return null; - } + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + cctx.kernalContext().clientNode()); - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { - assert res == null || res instanceof GridCacheReturn; + req.addUpdateEntry(cacheKey, + val, + conflictTtl, + conflictExpireTime, + conflictVer, + true); - GridCacheReturn ret = (GridCacheReturn)res; + return req; + } - Object retval = - res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success(); + /** + * @param topVer Topology version. + */ + void map(AffinityTopologyVersion topVer) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - if (op == TRANSFORM && retval == null) - retval = Collections.emptyMap(); + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); - if (super.onDone(retval, err)) { - GridCacheVersion futVer = state.onFutureDone(); + return; + } - if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); + Exception err = null; + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; - return true; - } + int size = keys.size(); - return false; - } + synchronized (this) { + assert futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; - /** - * Response callback. - * - * @param nodeId Node ID. - * @param res Update response. - */ - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { - state.onResult(nodeId, res, false); - } + this.topVer = topVer; - /** - * Updates near cache. - * - * @param req Update request. - * @param res Update response. - */ - private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - if (!nearEnabled || !req.hasPrimary()) - return; + futVer = cctx.versions().next(topVer); - GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); + if (storeFuture()) + cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this); - near.processNearAtomicUpdateResponse(req, res); - } + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (updVer == null) + updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; - /** - * Maps future on ready topology. - */ - private void mapOnTopology() { - cache.topology().readLock(); + if (updVer != null && log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); - AffinityTopologyVersion topVer = null; + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + singleReq = mapSingleUpdate(); + } + else { + pendingMappings = mapUpdate(topNodes); - return; - } + if (pendingMappings.size() == 1) + singleReq = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings = U.newHashMap(pendingMappings.size()); - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings.put(req.nodeId(), req); + } + } + else + mappings = new HashMap<>(pendingMappings); - if (fut.isDone()) { - if (!fut.isCacheTopologyValid(cctx)) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - cctx.name())); + assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; + } + } - return; + remapKeys = null; } + catch (Exception e) { + err = e; + } + } - topVer = fut.topologyVersion(); + if (err != null) { + onDone(err); + + return; } + + // Optimize mapping for single key. + if (singleReq != null) + mapSingle(singleReq.nodeId(), singleReq); else { - if (waitTopFut) { - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + assert pendingMappings != null; - return; + if (size == 0) + onDone(new GridCacheReturn(cctx, true, null, true)); + else + doUpdate(pendingMappings); } } - finally { - cache.topology().readUnlock(); - } - - state.map(topVer); - } - /** - * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. - */ - private boolean storeFuture() { - return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; - } + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; - /** - * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near - * node and send updates in parallel to all participating nodes. - * - * @param key Key to map. - * @param topVer Topology version to map. - * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. - * @return Collection of nodes to which key is mapped. - */ - private Collection<ClusterNode> mapKey( - KeyCacheObject key, - AffinityTopologyVersion topVer, - boolean fastMap - ) { - GridCacheAffinityManager affMgr = cctx.affinity(); + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); - // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); - } + return topCompleteFut; + } - /** - * Maps future to single node. - * - * @param nodeId Node ID. - * @param req Request. - */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res); - } - }); + return null; } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, null, true)); - } - catch (IgniteCheckedException e) { - state.onSendError(req, e); - } - } - } + GridFutureAdapter<Void> fut0; - /** - * Sends messages to remote nodes and updates local cache. - * - * @param mappings Mappings to send. - */ - private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { - UUID locNodeId = cctx.localNodeId(); + synchronized (this) { + fut0 = topCompleteFut; - GridNearAtomicUpdateRequest locUpdate = null; + topCompleteFut = null; - // Send messages to remote nodes first, then run local update. - for (GridNearAtomicUpdateRequest req : mappings.values()) { - if (locNodeId.equals(req.nodeId())) { - assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + - ", req=" + req + ']'; + ver0 = futVer; - locUpdate = req; + futVer = null; } - else { - try { - if (log.isDebugEnabled()) - log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - } - catch (IgniteCheckedException e) { - state.onSendError(req, e); - } - } - } + if (fut0 != null) + fut0.onDone(); - if (locUpdate != null) { - cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res); - } - }); + return ver0; } - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, null, true)); + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(UpdateState.class, this); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 2064338..d56ed7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -128,7 +128,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> { /** * @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures. */ - public void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) { + @SafeVarargs + public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) { this.ignoreChildFailures = ignoreChildFailures; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java index d5c5314..fc9dad0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java @@ -421,6 +421,7 @@ public final class X { * @return {@code True} if one of the causing exception is an instance of passed in classes, * {@code false} otherwise. */ + @SafeVarargs public static boolean hasCause(@Nullable Throwable t, @Nullable Class<? extends Throwable>... cls) { if (t == null || F.isEmpty(cls)) return false;
