Repository: ignite Updated Branches: refs/heads/ignite-4705-1 11d0b8423 -> 5215ed4ca
ignite-4705 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19c340ce Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19c340ce Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19c340ce Branch: refs/heads/ignite-4705-1 Commit: 19c340ce21f013dce0155e93a6b7fe89adbd1def Parents: 9e93f19 Author: sboikov <[email protected]> Authored: Thu Mar 2 11:20:15 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Mar 2 11:20:15 2017 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 54 +++-------- .../GridNearAtomicSingleUpdateFuture.java | 91 +++++++++--------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 98 ++++++++++---------- 3 files changed, 103 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 9f7512c..204e510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -212,18 +212,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt // Cannot remap. remapCnt = 1; - Long futId = addAtomicFuture(topVer); - - if (futId != null) - map(topVer, futId); + map(topVer); } } /** * @param topVer Topology version. - * @param futId Future ID. */ - protected abstract void map(AffinityTopologyVersion topVer, Long futId); + protected abstract void map(AffinityTopologyVersion topVer); /** * Maps future on ready topology. @@ -248,7 +244,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. */ - protected boolean storeFuture() { + final boolean storeFuture() { return syncMode != FULL_ASYNC; } @@ -258,7 +254,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param nodeId Node ID. * @param req Request. */ - protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { + final void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, new GridDhtAtomicCache.UpdateReplyClosure() { @@ -318,43 +314,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param req Request. * @param e Error. */ - protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { - synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureId(), - cctx.deploymentEnabled()); - - res.addFailedKeys(req.keys(), e); - - onPrimaryResponse(req.nodeId(), res, true); - } - } + final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureId(), + cctx.deploymentEnabled()); - /** - * Adds future prevents topology change before operation complete. - * Should be invoked before topology lock released. - * - * @param topVer Topology version. - * @return Future ID in case future added. - */ - final Long addAtomicFuture(AffinityTopologyVersion topVer) { - // TODO IGNITE-4705: it seems no need to add future inside read lock. - - Long futId = cctx.mvcc().atomicFutureId(); - - synchronized (mux) { - assert this.futId == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; - - this.topVer = topVer; - this.futId = futId; - } - - if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) - return null; + res.addFailedKeys(req.keys(), e); - return futId; + onPrimaryResponse(req.nodeId(), res, true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 7a18328..b1b951f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -454,65 +454,55 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override protected void mapOnTopology() { - // TODO IGNITE-4705: primary should block topology change, so it seems read lock is not needed. - cache.topology().readLock(); - AffinityTopologyVersion topVer; - Long futId; - - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); - - return; - } - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); - - if (err != null) { - onDone(err); + return; + } - return; - } + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - topVer = fut.topologyVersion(); + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); - futId = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } + + topVer = fut.topologyVersion(); } - finally { - cache.topology().readUnlock(); + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; } - if (futId != null) - map(topVer, futId); + map(topVer); } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer, Long futId) { + @Override protected void map(AffinityTopologyVersion topVer) { + Long futId = cctx.mvcc().atomicFutureId(); + Exception err = null; GridNearAtomicAbstractUpdateRequest singleReq0 = null; @@ -520,11 +510,20 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda singleReq0 = mapSingleUpdate(topVer, futId); synchronized (mux) { - assert this.futId.equals(futId) || (this.isDone() && this.error() != null); - assert this.topVer == topVer; + assert this.futId == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.futId = futId; reqState = new PrimaryRequestState(singleReq0); } + + if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) { + assert isDone(); + + return; + } } catch (Exception e) { err = e; http://git-wip-us.apache.org/repos/asf/ignite/blob/19c340ce/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 89b2573..573cb40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -687,59 +687,47 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu @Override protected void mapOnTopology() { AffinityTopologyVersion topVer; - Long futId; + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); - cache.topology().readLock(); - - try { - if (cache.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - cache.name())); - - return; - } - - GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - - if (fut.isDone()) { - Throwable err = fut.validateCache(cctx); - - if (err != null) { - onDone(err); + return; + } - return; - } + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); - topVer = fut.topologyVersion(); + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); - futId = addAtomicFuture(topVer); - } - else { - if (waitTopFut) { - assert !topLocked : this; - - fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - mapOnTopology(); - } - }); - } - }); - } - else - onDone(new GridCacheTryPutFailedException()); + if (err != null) { + onDone(err); return; } + + topVer = fut.topologyVersion(); } - finally { - cache.topology().readUnlock(); + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; } - if (futId != null) - map(topVer, futId, remapKeys); + map(topVer, remapKeys); } /** @@ -799,18 +787,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override protected void map(AffinityTopologyVersion topVer, Long futId) { - map(topVer, futId, null); + @Override protected void map(AffinityTopologyVersion topVer) { + map(topVer, null); } /** * @param topVer Topology version. - * @param futId Future ID. * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer, - Long futId, - @Nullable Collection<KeyCacheObject> remapKeys) { + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -820,6 +805,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } + Long futId = cctx.mvcc().atomicFutureId(); + Exception err = null; PrimaryRequestState singleReq0 = null; Map<UUID, PrimaryRequestState> mappings0 = null; @@ -848,8 +835,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (mux) { - assert this.futId.equals(futId) || (this.isDone() && this.error() != null); - assert this.topVer == topVer; + assert this.futId == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + this.futId = futId; resCnt = 0; @@ -858,6 +848,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu this.remapKeys = null; } + + if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) { + assert isDone(); + + return; + } } catch (Exception e) { err = e;
