Repository: ignite Updated Branches: refs/heads/ignite-1537 5b2f375d5 -> 7cf8ed26b
ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7cf8ed26 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7cf8ed26 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7cf8ed26 Branch: refs/heads/ignite-1537 Commit: 7cf8ed26b6ba692b7860cd9e258064baf0bd554c Parents: 5b2f375 Author: sboikov <[email protected]> Authored: Tue Dec 22 16:47:19 2015 +0300 Committer: sboikov <[email protected]> Committed: Tue Dec 22 16:47:19 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 12 -- .../processors/cache/GridCacheMvccManager.java | 51 +++---- .../dht/atomic/GridDhtAtomicCache.java | 10 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 137 ++++++++++++------- 5 files changed, 106 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 0c2f67c..c10ebf3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -244,9 +244,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** */ private boolean deferredDel; - /** */ - private boolean marshallerCache; - /** * Empty constructor required for {@link Externalizable}. */ @@ -364,15 +361,6 @@ public class GridCacheContext<K, V> implements Externalizable { expiryPlc = null; itHolder = new CacheWeakQueryIteratorsHolder(log); - - marshallerCache = cacheType == CacheType.MARSHALLER; - } - - /** - * @return {@code True} if marshaller cache. - */ - public boolean marshallerCache() { - return marshallerCache; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 46bd093..dbc6992 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -106,10 +106,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>(); - /** Pending atomic futures. */ - private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> sysCacheAtomicFuts = - new ConcurrentHashMap8<>(); - /** */ private final ConcurrentMap<IgniteUuid, GridCacheFuture<?>> futs = new ConcurrentHashMap8<>(); @@ -219,28 +215,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) fut.onNodeLeft(discoEvt.eventNode().id()); - onNodeLeft(discoEvt.eventNode().id(), sysCacheAtomicFuts); - - onNodeLeft(discoEvt.eventNode().id(), atomicFuts); - } - }; - - /** - * @param nodeId Failed node ID. - * @param atomicFuts Futures collection. - */ - private void onNodeLeft(UUID nodeId, ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts) { - for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { - cacheFut.onNodeLeft(nodeId); + for (GridCacheAtomicFuture<?> cacheFut : atomicFuts.values()) { + cacheFut.onNodeLeft(discoEvt.eventNode().id()); - if (cacheFut.isCancelled() || cacheFut.isDone()) { - GridCacheVersion futVer = cacheFut.version(); + if (cacheFut.isCancelled() || cacheFut.isDone()) { + GridCacheVersion futVer = cacheFut.version(); - if (futVer != null) - atomicFuts.remove(futVer, cacheFut); + if (futVer != null) + atomicFuts.remove(futVer, cacheFut); + } } } - } + }; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -364,7 +350,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { for (GridCacheFuture<?> fut : activeFutures()) ((GridFutureAdapter)fut).onDone(err); - for (GridCacheAtomicFuture<?> future : atomicFutures()) + for (GridCacheAtomicFuture<?> future : atomicFuts.values()) ((GridFutureAdapter)future).onDone(err); } @@ -404,12 +390,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * @param futVer Future ID. - * @param sysCache If {@code true} uses special futures collection. * @param fut Future. * @return {@code False} if future was forcibly completed with error. */ - public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut, boolean sysCache) { - IgniteInternalFuture<?> old = sysCache ? sysCacheAtomicFuts.put(futVer, fut) : atomicFuts.put(futVer, fut); + public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) { + IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; @@ -420,27 +405,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Collection of pending atomic futures. */ public Collection<GridCacheAtomicFuture<?>> atomicFutures() { - return F.concat(false, sysCacheAtomicFuts.values(), atomicFuts.values()); + return atomicFuts.values(); } /** * Gets future by given future ID. * * @param futVer Future ID. - * @param sysCache If {@code true} uses special futures collection. * @return Future. */ - @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer, boolean sysCache) { - return sysCache ? sysCacheAtomicFuts.get(futVer) : atomicFuts.get(futVer); + @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) { + return atomicFuts.get(futVer); } /** * @param futVer Future ID. - * @param sysCache If {@code true} uses special futures collection. * @return Removed future. */ - @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer, boolean sysCache) { - return sysCache ? sysCacheAtomicFuts.remove(futVer) : atomicFuts.remove(futVer); + @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) { + return atomicFuts.remove(futVer); } /** @@ -1003,7 +986,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); - for (GridCacheAtomicFuture<?> fut : atomicFutures()) { + for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { IgniteInternalFuture<Void> complete = fut.completeFuture(topVer); if (complete != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3c8b7d4..6942d87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1350,7 +1350,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.returnValue(retVal); if (dhtFut != null) - ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut, false); + ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); } else // Should remap all keys. @@ -2622,8 +2622,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { res.nodeId(ctx.localNodeId()); - GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion(), - ctx.marshallerCache()); + GridNearAtomicUpdateFuture fut = (GridNearAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (fut != null) fut.onResult(nodeId, res); @@ -2794,8 +2793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Processing dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc(). - atomicFuture(res.futureVersion(), false); + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); if (updateFut != null) updateFut.onResult(nodeId, res); @@ -2814,7 +2812,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Processing deferred dht atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); for (GridCacheVersion ver : res.futureVersions()) { - GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver, false); + GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver); if (updateFut != null) updateFut.onResult(nodeId); http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index e01ffc9..e31af19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -349,7 +349,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** {@inheritDoc} */ @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) { if (super.onDone(res, err)) { - cctx.mvcc().removeAtomicFuture(version(), false); + cctx.mvcc().removeAtomicFuture(version()); if (err != null) { if (!mappings.isEmpty()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/7cf8ed26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9aa6fa6..e617f43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.UUID; @@ -288,7 +287,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Cannot remap. remapCnt = 1; - state.map(topVer); + state.map(topVer, null); } } @@ -327,7 +326,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridCacheVersion futVer = state.onFutureDone(); if (futVer != null) - cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache()); + cctx.mvcc().removeAtomicFuture(futVer); return true; } @@ -415,7 +414,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> cache.topology().readUnlock(); } - state.map(topVer); + state.map(topVer, null); } /** @@ -721,7 +720,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> topCompleteFut = null; - cctx.mvcc().removeAtomicFuture(futVer, cctx.marshallerCache()); + cctx.mvcc().removeAtomicFuture(futVer); futVer = null; topVer = AffinityTopologyVersion.ZERO; @@ -790,7 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> try { AffinityTopologyVersion topVer = fut.get(); - map(topVer); + map(topVer, remapKeys); } catch (IgniteCheckedException e) { onDone(e); @@ -826,8 +825,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param topVer Topology version. + * @param remapKeys Keys to remap. */ - void map(AffinityTopologyVersion topVer) { + void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -839,68 +839,86 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Exception err = null; GridNearAtomicUpdateRequest singleReq0 = null; - Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; + Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null; int size = keys.size(); - synchronized (this) { - assert futVer == null : this; - assert this.topVer == AffinityTopologyVersion.ZERO : this; + GridCacheVersion futVer = cctx.versions().next(topVer); + + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { + assert isDone() : GridNearAtomicUpdateFuture.this; - resCnt = 0; + return; + } + } - this.topVer = topVer; + GridCacheVersion updVer; - futVer = cctx.versions().next(topVer); + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (cctx.config().getAtomicWriteOrderMode() == CLOCK) { + updVer = this.updVer; - if (storeFuture()) { - if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this, cctx.marshallerCache())) { - assert isDone() : GridNearAtomicUpdateFuture.this; + if (updVer == null) { + updVer = cctx.versions().next(topVer); - return; - } + if (log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); } + } + else + updVer = null; - // Assign version on near node in CLOCK ordering mode even if fastMap is false. - if (updVer == null) - updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + try { + if (size == 1 && !fastMap) { + assert remapKeys == null || remapKeys.size() == 1; - if (updVer != null && log.isDebugEnabled()) - log.debug("Assigned fast-map version for update on near node: " + updVer); + singleReq0 = mapSingleUpdate(topVer, futVer, updVer); + } + else { + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes, + topVer, + futVer, + updVer, + remapKeys); + + if (pendingMappings.size() == 1) + singleReq0 = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings0 = U.newHashMap(pendingMappings.size()); - try { - if (size == 1 && !fastMap) { - assert remapKeys == null || remapKeys.size() == 1; + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings0.put(req.nodeId(), req); + } + } + else + mappings0 = pendingMappings; - singleReq0 = singleReq = mapSingleUpdate(); + assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; } - else { - pendingMappings = mapUpdate(topNodes); + } - if (pendingMappings.size() == 1) - singleReq0 = singleReq = F.firstValue(pendingMappings); - else { - if (syncMode == PRIMARY_SYNC) { - mappings = U.newHashMap(pendingMappings.size()); + synchronized (this) { + assert this.futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; - for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { - if (req.hasPrimary()) - mappings.put(req.nodeId(), req); - } - } - else - mappings = new HashMap<>(pendingMappings); + this.topVer = topVer; + this.updVer = updVer; + this.futVer = futVer; - assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; - } - } + resCnt = 0; - remapKeys = null; - } - catch (Exception e) { - err = e; + singleReq = singleReq0; + mappings = mappings0; + + this.remapKeys = null; } } + catch (Exception e) { + err = e; + } if (err != null) { onDone(err); @@ -912,12 +930,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (singleReq0 != null) mapSingle(singleReq0.nodeId(), singleReq0); else { - assert pendingMappings != null; + assert mappings0 != null; if (size == 0) onDone(new GridCacheReturn(cctx, true, true, null, true)); else - doUpdate(pendingMappings); + doUpdate(mappings0); } } @@ -965,10 +983,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * @param topNodes Cache nodes. + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. + * @param remapKeys Keys to remap. * @return Mapping. * @throws Exception If failed. */ - private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes, + AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer, + @Nullable Collection<KeyCacheObject> remapKeys) throws Exception { Iterator<?> it = null; if (vals != null) @@ -1089,10 +1115,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** + * @param topVer Topology version. + * @param futVer Future version. + * @param updVer Update version. * @return Request. * @throws Exception If failed. */ - private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, + GridCacheVersion futVer, + @Nullable GridCacheVersion updVer) throws Exception { Object key = F.first(keys); Object val;
