Repository: ignite Updated Branches: refs/heads/ignite-1537 111e44743 -> 1e9d9cc17
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e9d9cc1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e9d9cc1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e9d9cc1 Branch: refs/heads/ignite-1537 Commit: 1e9d9cc1785afc9050c2af936a0d165f7dbafd73 Parents: 111e447 Author: sboikov <[email protected]> Authored: Mon Dec 21 15:24:20 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 21 15:24:20 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 12 +++++ .../processors/cache/GridCacheMvccManager.java | 51 +++++++++++++------- .../dht/atomic/GridDhtAtomicCache.java | 9 ++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 12 +++-- 5 files changed, 59 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index c10ebf3..0c2f67c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -244,6 +244,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean deferredDel; + /** */ + private boolean marshallerCache; + /** * Empty constructor required for {@link Externalizable}. */ @@ -361,6 +364,15 @@ public class GridCacheContext<K, V> implements Externalizable { expiryPlc = null; itHolder = new CacheWeakQueryIteratorsHolder(log); + + marshallerCache = cacheType == CacheType.MARSHALLER; + } + + /** + * @return {@code True} if marshaller cache. + */ + public boolean marshallerCache() { + return marshallerCache; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dbc6992..46bd093 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -106,6 +106,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); + /** Pending atomic futures. */ + private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> sysCacheAtomicFuts = + new ConcurrentHashMap8<>(); + /** */ private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>(); @@ -215,18 +219,28 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) fut.onNodeLeft(discoEvt.eventNode().id()); - for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { - cacheFut.onNodeLeft(discoEvt.eventNode().id()); + onNodeLeft(discoEvt.eventNode().id(), sysCacheAtomicFuts); - if (cacheFut.isCancelled() || cacheFut.isDone()) { - GridCacheVersion futVer = cacheFut.version(); + onNodeLeft(discoEvt.eventNode().id(), atomicFuts); + } + }; - if (futVer != null) - atomicFuts.remove(futVer, cacheFut); - } + /** + * @param nodeId Failed node ID. + * @param atomicFuts Futures collection. + */ + private void onNodeLeft(UUID nodeId, ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts) { + for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { + cacheFut.onNodeLeft(nodeId); + + if (cacheFut.isCancelled() || cacheFut.isDone()) { + GridCacheVersion futVer = cacheFut.version(); + + if (futVer != null) + atomicFuts.remove(futVer, cacheFut); } } - }; + } /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -350,7 +364,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) ((GridFutureAdapter)fut).onDone(err); - for (GridCacheAtomicFuture<?> future : atomicFuts.values()) + for (GridCacheAtomicFuture<?> future : atomicFutures()) ((GridFutureAdapter)future).onDone(err); } @@ -390,11 +404,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param futVer Future ID. + * @param sysCache If {@code true} uses special futures collection. * @param fut Future. * @return {@code False} if future was forcibly completed with error. */ - public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { - IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); + public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut, boolean sysCache) { + IgniteInternalFuture<?> old = sysCache ? sysCacheAtomicFuts.put(futVer, fut) : atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; @@ -405,25 +420,27 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Collection of pending atomic futures. */ public Collection<GridCacheAtomicFuture<?>> atomicFutures() { - return atomicFuts.values(); + return F.concat(false, sysCacheAtomicFuts.values(), atomicFuts.values()); } /** * Gets future by given future ID. * * @param futVer Future ID. + * @param sysCache If {@code true} uses special futures collection. * @return Future. */ - @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) { - return atomicFuts.get(futVer); + @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer, boolean sysCache) { + return sysCache ? sysCacheAtomicFuts.get(futVer) : atomicFuts.get(futVer); } /** * @param futVer Future ID. + * @param sysCache If {@code true} uses special futures collection. * @return Removed future. */ - @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) { - return atomicFuts.remove(futVer); + @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer, boolean sysCache) { + return sysCache ? sysCacheAtomicFuts.remove(futVer) : atomicFuts.remove(futVer); } /** @@ -986,7 +1003,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); - for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { + for (GridCacheAtomicFuture<?> fut : atomicFutures()) { IgniteInternalFuture<Void> complete = fut.completeFuture(topVer); if (complete != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 481317a..8cb5249 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1339,7 +1339,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.returnValue(retVal); if (dhtFut != null) - ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); + ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut, false); } else // Should remap all keys. @@ -2611,7 +2611,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); + GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion(), + ctx.marshallerCache()); if (fut != null) fut.onResult(nodeId, res); @@ -2783,7 +2784,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc(). - atomicFuture(res.futureVersion()); + atomicFuture(res.futureVersion(), false); if (updateFut != null) updateFut.onResult(nodeId, res); @@ -2802,7 +2803,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); for (GridCacheVersion ver : res.futureVersions()) { - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver, false); if (updateFut != null) updateFut.onResult(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index e31af19..e01ffc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -349,7 +349,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { - cctx.mvcc().removeAtomicFuture(version()); + cctx.mvcc().removeAtomicFuture(version(), false); if (err != null) { if (!mappings.isEmpty()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9d9cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index eefdc73..9d8f4c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -327,7 +327,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridCacheVersion futVer = state.onFutureDone(); if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer); + cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache()); return true; } @@ -582,7 +582,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> req = mappings != null ? mappings.get(nodeId) : null; if (req != null && req.response() == null) { - res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + nodeId, + req.futureVersion(), cctx.deploymentEnabled()); ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + @@ -718,7 +720,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> topCompleteFut = null; - cctx.mvcc().removeAtomicFuture(futVer); + cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache()); futVer = null; topVer = AffinityTopologyVersion.ZERO; @@ -847,7 +849,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> futVer = cctx.versions().next(topVer); if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this, cctx.marshallerCache())) { assert isDone() : GridNearAtomicUpdateFuture.this; return; @@ -999,7 +1001,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> throw new NullPointerException("Null value."); } else if (conflictPutVals != null) { - GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); + GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); val = conflictPutVal.value(); conflictVer = conflictPutVal.version();
