http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 0a816a7..930c4af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -17,9 +17,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; @@ -41,36 +40,29 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** * DHT atomic cache near update future. */ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture { - /** */ - private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4"); - /** Keys */ private Object key; /** Values. */ - @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private Object val; - /** Not null is operation is mapped to single node. */ - private GridNearAtomicAbstractUpdateRequest req; + /** */ + private PrimaryRequestState reqState; /** * @param cctx Cache context. @@ -110,8 +102,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda int remapCnt, boolean waitTopFut ) { - super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash, - skipStore, keepBinary, remapCnt, waitTopFut); + super(cctx, + cache, + syncMode, + op, + invokeArgs, + retval, + rawRetval, + expiryPlc, + filter, + subjId, + taskNameHash, + skipStore, + keepBinary, + remapCnt, + waitTopFut); assert subjId != null; @@ -120,52 +125,63 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { + @Override public Long id() { synchronized (mux) { - return futVer; + return futId; } } /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + AffinityTopologyVersion remapTopVer0 = null; - GridNearAtomicAbstractUpdateRequest req; + GridNearAtomicCheckUpdateRequest checkReq = null; + + boolean rcvAll = false; synchronized (mux) { - req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null; + if (reqState == null) + return false; - if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - nodeId, - req.futureVersion(), - cctx.deploymentEnabled()); + if (reqState.req.nodeId.equals(nodeId)) { + GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail(); + + if (req != null) { + GridNearAtomicUpdateResponse res = primaryFailedResponse(req); - ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + - "before response is received: " + nodeId); + rcvAll = true; - e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + reqState.onPrimaryResponse(res, cctx); - res.addFailedKeys(req.keys(), e); + onPrimaryError(req, res); + } } - } + else { + DhtLeftResult res = reqState.onDhtNodeLeft(nodeId); - if (res != null) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + nodeId + ']'); + if (res == DhtLeftResult.DONE) + rcvAll = true; + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) + checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req); + else + return false; } - onResult(nodeId, res, true); + if (rcvAll) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } } - return false; - } + if (checkReq != null) + sendCheckUpdateRequest(checkReq); + else if (rcvAll) + finishUpdateFuture(opRes0, err0, remapTopVer0); - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { - return null; + return false; } /** {@inheritDoc} */ @@ -175,15 +191,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridCacheReturn ret = (GridCacheReturn)res; - Object retval = - res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? - cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); + Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? + cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); if (super.onDone(retval, err)) { - GridCacheVersion futVer = onFutureDone(); + Long futVer = onFutureDone(); if (futVer != null) cctx.mvcc().removeAtomicFuture(futVer); @@ -195,112 +210,103 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ + @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) { + GridCacheReturn opRes0; + CachePartialUpdateCheckedException err0; + AffinityTopologyVersion remapTopVer0; + + synchronized (mux) { + if (futId == null || futId != res.futureId()) + return; + + assert reqState != null; + assert reqState.req.nodeId().equals(res.primaryId()); + + if (opRes == null && res.hasResult()) + opRes = res.result(); + + if (reqState.onDhtResponse(nodeId, res)) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } + else + return; + } + + UpdateErrors errors = res.errors(); + + if (errors != null) { + assert errors.error() != null; + + onDone(errors.error()); + + return; + } + + finishUpdateFuture(opRes0, err0, remapTopVer0); + } + + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { GridNearAtomicAbstractUpdateRequest req; - AffinityTopologyVersion remapTopVer = null; + AffinityTopologyVersion remapTopVer0; GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; - GridFutureAdapter<?> fut0 = null; - synchronized (mux) { - if (!res.futureVersion().equals(futVer)) + if (futId == null || futId != res.futureId()) return; - if (!this.req.nodeId().equals(nodeId)) - return; + req = reqState.processPrimaryResponse(nodeId, res); - req = this.req; - - this.req = null; + if (req == null) + return; - boolean remapKey = !F.isEmpty(res.remapKeys()); + boolean remapKey = res.remapTopologyVersion() != null; if (remapKey) { - if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) - mapErrTopVer = req.topologyVersion(); - } - else if (res.error() != null) { - if (res.failedKeys() != null) { - if (err == null) - err = new CachePartialUpdateCheckedException( - "Failed to update keys (retry update if possible)."); - - Collection<Object> keys = new ArrayList<>(res.failedKeys().size()); + assert !req.topologyVersion().equals(res.remapTopologyVersion()); - for (KeyCacheObject key : res.failedKeys()) - keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); + assert remapTopVer == null : remapTopVer; - err.add(keys, res.error(), req.topologyVersion()); - } + remapTopVer = res.remapTopologyVersion(); } + else if (res.error() != null) + onPrimaryError(req, res); else { - if (!req.fastMap() || req.hasPrimary()) { - GridCacheReturn ret = res.returnValue(); - - if (op == TRANSFORM) { - if (ret != null) { - assert ret.value() == null || ret.value() instanceof Map : ret.value(); - - if (ret.value() != null) { - if (opRes != null) - opRes.mergeEntryProcessResults(ret); - else - opRes = ret; - } + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) { + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; } } - else - opRes = ret; } - } + else + opRes = ret; - if (remapKey) { - assert mapErrTopVer != null; + assert reqState != null; - remapTopVer = cctx.shared().exchange().topologyVersion(); + if (!reqState.onPrimaryResponse(res, cctx)) + return; } - else { - if (err != null && - X.hasCause(err, CachePartialUpdateCheckedException.class) && - X.hasCause(err, ClusterTopologyCheckedException.class) && - storeFuture() && - --remapCnt > 0) { - ClusterTopologyCheckedException topErr = - X.cause(err, ClusterTopologyCheckedException.class); - - if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { - CachePartialUpdateCheckedException cause = - X.cause(err, CachePartialUpdateCheckedException.class); - assert cause != null && cause.topologyVersion() != null : err; + remapTopVer0 = onAllReceived(); - remapTopVer = - new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); - - err = null; - updVer = null; - } - } - } - - if (remapTopVer == null) { + if (remapTopVer0 == null) { err0 = err; opRes0 = opRes; } - else { - fut0 = topCompleteFut; - - topCompleteFut = null; - - cctx.mvcc().removeAtomicFuture(futVer); - - futVer = null; - topVer = AffinityTopologyVersion.ZERO; - } } if (res.error() != null && res.failedKeys() == null) { @@ -309,55 +315,102 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return; } + if (remapTopVer0 != null) { + waitAndRemap(remapTopVer0); + + return; + } + if (nearEnabled && !nodeErr) updateNear(req, res); - if (remapTopVer != null) { - if (fut0 != null) - fut0.onDone(); + onDone(opRes0, err0); + } - if (!waitTopFut) { - onDone(new GridCacheTryPutFailedException()); + /** + * @return Non-null topology version if update should be remapped. + */ + private AffinityTopologyVersion onAllReceived() { + assert futId != null; - return; + AffinityTopologyVersion remapTopVer0 = null; + + if (remapTopVer == null) { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer0 = new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + } } + } + else + remapTopVer0 = remapTopVer; - if (topLocked) { - CachePartialUpdateCheckedException e = - new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + if (remapTopVer0 != null) { + cctx.mvcc().removeAtomicFuture(futId); - ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( - "Failed to update keys, topology changed while execute atomic update inside transaction."); + reqState = null; + futId = null; + topVer = AffinityTopologyVersion.ZERO; - cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + remapTopVer = null; + } - e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); + return remapTopVer0; + } - onDone(e); + /** + * @param remapTopVer New topology version. + */ + private void waitAndRemap(AffinityTopologyVersion remapTopVer) { + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); - return; - } + return; + } - IgniteInternalFuture<AffinityTopologyVersion> fut = - cctx.shared().exchange().affinityReadyFuture(remapTopVer); + if (topLocked) { + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - if (fut == null) - fut = new GridFinishedFuture<>(remapTopVer); + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause); + + onDone(e); return; } - onDone(opRes0, err0); + IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer); + + if (fut == null) + fut = new GridFinishedFuture<>(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); } /** @@ -369,7 +422,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { assert nearEnabled; - if (res.remapKeys() != null || !req.hasPrimary()) + if (res.remapTopologyVersion() != null) return; GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); @@ -380,103 +433,74 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override protected void mapOnTopology() { AffinityTopologyVersion topVer; - GridCacheVersion futVer; - - cache.topology().readLock(); - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - return; - } - - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + return; + } - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - if (err != null) { - onDone(err); + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); - return; - } - - topVer = fut.topologyVersion(); - - futVer = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } - } - finally { - cache.topology().readUnlock(); - } - - if (futVer != null) - map(topVer, futVer); - } - /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) { - Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); - - if (F.isEmpty(topNodes)) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + - "left the grid).")); + topVer = fut.topologyVersion(); + } + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); return; } - GridCacheVersion updVer; - - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { - updVer = this.updVer; - - if (updVer == null) { - updVer = futVer; + map(topVer); + } - if (log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); - } - } - else - updVer = null; + /** {@inheritDoc} */ + @Override protected void map(AffinityTopologyVersion topVer) { + long futId = cctx.mvcc().atomicFutureId(); Exception err = null; - GridNearAtomicAbstractUpdateRequest singleReq0 = null; + PrimaryRequestState reqState0 = null; try { - singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + reqState0 = mapSingleUpdate(topVer, futId); synchronized (mux) { - assert this.futVer == futVer || (this.isDone() && this.error() != null); - assert this.topVer == topVer; + assert this.futId == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.futId = futId; - this.updVer = updVer; + reqState = reqState0; + } - resCnt = 0; + if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) { + assert isDone(); - req = singleReq0; + return; } } catch (Exception e) { @@ -490,43 +514,80 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } // Optimize mapping for single key. - mapSingle(singleReq0.nodeId(), singleReq0); + sendSingleRequest(reqState0.req.nodeId(), reqState0.req); + + if (syncMode == FULL_ASYNC) { + onDone(new GridCacheReturn(cctx, true, true, null, true)); + + return; + } + + if (reqState0.req.initMappingLocally() && (cctx.discovery().topologyVersion() != topVer.topologyVersion())) + checkDhtNodes(futId); } /** - * @return Future version. + * @param futId + * @return */ - private GridCacheVersion onFutureDone() { - GridCacheVersion ver0; + private boolean checkDhtNodes(Long futId) { + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + AffinityTopologyVersion remapTopVer0 = null; - GridFutureAdapter<Void> fut0; + GridNearAtomicCheckUpdateRequest checkReq = null; synchronized (mux) { - fut0 = topCompleteFut; + if (this.futId == null || !this.futId.equals(futId)) + return false; - topCompleteFut = null; + assert reqState != null; - ver0 = futVer; + DhtLeftResult res = reqState.checkDhtNodes(cctx); - futVer = null; + if (res == DhtLeftResult.DONE) { + opRes0 = opRes; + err0 = err; + remapTopVer0 = onAllReceived(); + } + else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){ + checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req); + } + else + return true; } - if (fut0 != null) - fut0.onDone(); + if (checkReq != null) + sendCheckUpdateRequest(checkReq); + else + finishUpdateFuture(opRes0, err0, remapTopVer0); + + return false; + } + + /** + * @return Future ID. + */ + private Long onFutureDone() { + Long id0; + + synchronized (mux) { + id0 = futId; - return ver0; + futId = null; + } + + return id0; } /** * @param topVer Topology version. - * @param futVer Future version. - * @param updVer Update version. + * @param futId Future ID. * @return Request. * @throws Exception If failed. */ - private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, - GridCacheVersion futVer, - @Nullable GridCacheVersion updVer) throws Exception { + private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId) + throws Exception { if (key == null) throw new NullPointerException("Null key."); @@ -542,22 +603,27 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); - ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer); + boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) && + !cctx.discovery().hasNearCache(cctx.cacheId(), topVer); + + List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer); - if (primary == null) + if (F.isEmpty(nodes)) throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + "left the grid)."); + ClusterNode primary = nodes.get(0); + + boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1; + GridNearAtomicAbstractUpdateRequest req; - if (canUseSingleRequest(primary)) { + if (canUseSingleRequest()) { if (op == TRANSFORM) { req = new GridNearAtomicSingleUpdateInvokeRequest( cctx.cacheId(), primary.id(), - futVer, - false, - updVer, + futId, topVer, topLocked, syncMode, @@ -566,9 +632,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda invokeArgs, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } else { @@ -576,9 +642,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda req = new GridNearAtomicSingleUpdateRequest( cctx.cacheId(), primary.id(), - futVer, - false, - updVer, + futId, topVer, topLocked, syncMode, @@ -586,18 +650,16 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } else { req = new GridNearAtomicSingleUpdateFilterRequest( cctx.cacheId(), primary.id(), - futVer, - false, - updVer, + futId, topVer, topLocked, syncMode, @@ -606,9 +668,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled()); } } @@ -617,9 +679,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), primary.id(), - futVer, - false, - updVer, + futId, topVer, topLocked, syncMode, @@ -630,9 +690,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda filter, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - cctx.kernalContext().clientNode(), cctx.deploymentEnabled(), 1); } @@ -641,18 +701,39 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda val, CU.TTL_NOT_CHANGED, CU.EXPIRE_TIME_CALCULATE, - null, - true); + null); + + return new PrimaryRequestState(req, nodes, true); + } + + /** + * @param opRes Operation result. + * @param err Operation error. + * @param remapTopVer Not-null topology version if need remap update. + */ + private void finishUpdateFuture(GridCacheReturn opRes, + CachePartialUpdateCheckedException err, + @Nullable AffinityTopologyVersion remapTopVer) { + if (remapTopVer != null) { + waitAndRemap(remapTopVer); + + return; + } + + if (nearEnabled) { + assert reqState.req.response() != null; + + updateNear(reqState.req, reqState.req.response()); + } - return req; + onDone(opRes, err); } /** - * @param node Target node * @return {@code True} can use 'single' update requests. */ - private boolean canUseSingleRequest(ClusterNode node) { - return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0; + private boolean canUseSingleRequest() { + return expiryPlc == null; } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java index 6582063..f8b3984 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java @@ -76,9 +76,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. + * @param futId Future ID. * @param topVer Topology version. * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. @@ -89,15 +87,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateInvokeRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, + long futId, @NotNull AffinityTopologyVersion topVer, boolean topLocked, CacheWriteSynchronizationMode syncMode, @@ -106,17 +101,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl @Nullable Object[] invokeArgs, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, - boolean clientReq, boolean addDepInfo ) { super( cacheId, nodeId, - futVer, - fastMap, - updateVer, + futId, topVer, topLocked, syncMode, @@ -124,14 +117,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - clientReq, addDepInfo ); - this.invokeArgs = invokeArgs; assert op == TRANSFORM : op; + + this.invokeArgs = invokeArgs; } /** @@ -140,14 +134,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl * @param conflictTtl Conflict TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). - * @param primary If given key is primary on this mapping. */ @Override public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean primary) { + @Nullable GridCacheVersion conflictVer) { assert conflictTtl < 0 : conflictTtl; assert conflictExpireTime < 0 : conflictExpireTime; assert conflictVer == null : conflictVer; @@ -156,9 +148,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl entryProcessor = (EntryProcessor<Object, Object, Object>)val; this.key = key; - partId = key.partition(); - - hasPrimary(hasPrimary() | primary); } /** {@inheritDoc} */ @@ -246,13 +235,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl } switch (writer.state()) { - case 14: + case 12: if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes)) return false; writer.incrementState(); - case 15: + case 13: if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; @@ -274,7 +263,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl return false; switch (reader.state()) { - case 14: + case 12: entryProcessorBytes = reader.readByteArray("entryProcessorBytes"); if (!reader.isLastRead()) @@ -282,7 +271,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl reader.incrementState(); - case 15: + case 13: invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) @@ -297,7 +286,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index c3e9fbe..b9a1fc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -57,9 +57,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin /** Value to update. */ protected CacheObject val; - /** Partition of key. */ - protected int partId; - /** * Empty constructor required by {@link Externalizable}. */ @@ -72,9 +69,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. + * @param futId Future ID. * @param topVer Topology version. * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. @@ -84,15 +79,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. */ GridNearAtomicSingleUpdateRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, + long futId, @NotNull AffinityTopologyVersion topVer, boolean topLocked, CacheWriteSynchronizationMode syncMode, @@ -100,17 +92,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin boolean retval, @Nullable UUID subjId, int taskNameHash, + boolean needPrimaryRes, boolean skipStore, boolean keepBinary, - boolean clientReq, boolean addDepInfo ) { - super( - cacheId, + super(cacheId, nodeId, - futVer, - fastMap, - updateVer, + futId, topVer, topLocked, syncMode, @@ -118,16 +107,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin retval, subjId, taskNameHash, + needPrimaryRes, skipStore, keepBinary, - clientReq, - addDepInfo - ); + addDepInfo); } /** {@inheritDoc} */ @Override public int partition() { - return partId; + assert key != null; + + return key.partition(); } /** @@ -136,14 +126,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin * @param conflictTtl Conflict TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). - * @param primary If given key is primary on this mapping. */ @Override public void addUpdateEntry(KeyCacheObject key, @Nullable Object val, long conflictTtl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean primary) { + @Nullable GridCacheVersion conflictVer) { assert op != TRANSFORM; assert val != null || op == DELETE; assert conflictTtl < 0 : conflictTtl; @@ -151,19 +139,18 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin assert conflictVer == null : conflictVer; this.key = key; - partId = key.partition(); if (val != null) { assert val instanceof CacheObject : val; this.val = (CacheObject)val; } - - hasPrimary(hasPrimary() | primary); } /** {@inheritDoc} */ @Override public int size() { + assert key != null; + return key == null ? 0 : 1; } @@ -253,8 +240,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin if (val != null) val.finishUnmarshal(cctx.cacheObjectContext(), ldr); - - key.partition(partId); } /** {@inheritDoc} */ @@ -272,19 +257,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin } switch (writer.state()) { - case 11: + case 10: if (!writer.writeMessage("key", key)) return false; writer.incrementState(); - case 12: - if (!writer.writeInt("partId", partId)) - return false; - - writer.incrementState(); - - case 13: + case 11: if (!writer.writeMessage("val", val)) return false; @@ -306,7 +285,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin return false; switch (reader.state()) { - case 11: + case 10: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -314,15 +293,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin reader.incrementState(); - case 12: - partId = reader.readInt("partId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: + case 11: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -350,7 +321,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 12; } /** {@inheritDoc} */
