ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e51f6c4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e51f6c4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e51f6c4 Branch: refs/heads/ignite-5075-pds Commit: 4e51f6c426763a5f36dbe16c089dfd04269a618e Parents: f270533 Author: sboikov <[email protected]> Authored: Thu Jun 1 20:11:55 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jun 1 20:11:55 2017 +0300 ---------------------------------------------------------------------- .../cache/GridCacheConcurrentMap.java | 7 +++- .../cache/GridCacheConcurrentMapImpl.java | 34 +++++++++++----- .../cache/GridCacheLocalConcurrentMap.java | 17 +++++--- .../distributed/dht/GridDhtLocalPartition.java | 43 +++++++++++++------- .../distributed/near/GridNearCacheAdapter.java | 4 +- .../processors/cache/local/GridLocalCache.java | 2 +- 6 files changed, 72 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 816f0b2..282faaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -116,15 +116,20 @@ public interface GridCacheConcurrentMap { */ static class CacheMapHolder { /** */ + public final GridCacheContext cctx; + + /** */ public final AtomicInteger size = new AtomicInteger(); /** */ public final ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map; /** + * @param cctx Cache context. * @param map Map. */ - public CacheMapHolder(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) { + public CacheMapHolder(GridCacheContext cctx, ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map) { + this.cctx = cctx; this.map = map; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index 12d9980..37f2a51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -54,7 +54,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) { - CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); + CacheMapHolder hld = entriesMapIfExists(ctx.cacheIdBoxed()); return hld != null ? hld.map.get(key) : null; } @@ -66,7 +66,18 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM KeyCacheObject key, final boolean create, final boolean touch) { - CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); + return putEntryIfObsoleteOrAbsent(null, ctx, topVer, key, create, touch); + } + + protected final GridCacheMapEntry putEntryIfObsoleteOrAbsent( + @Nullable CacheMapHolder hld, + GridCacheContext ctx, + final AffinityTopologyVersion topVer, + KeyCacheObject key, + final boolean create, + final boolean touch) { + if (hld == null) + hld = entriesMapIfExists(ctx.cacheIdBoxed()); GridCacheMapEntry cur = null; GridCacheMapEntry created = null; @@ -94,7 +105,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM } if (hld == null) { - hld = entriesMap(ctx.cacheIdBoxed(), true); + hld = entriesMap(ctx); assert hld != null; } @@ -209,13 +220,16 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM } /** + * @param cctx Cache context. + * @return Map for given cache ID. + */ + @Nullable protected abstract CacheMapHolder entriesMap(GridCacheContext cctx); + + /** * @param cacheId Cache ID. - * @param create Create flag. * @return Map for given cache ID. */ - @Nullable protected abstract CacheMapHolder entriesMap( - Integer cacheId, - boolean create); + @Nullable protected abstract CacheMapHolder entriesMapIfExists(Integer cacheId); /** * @@ -247,7 +261,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM @Override public boolean removeEntry(final GridCacheEntryEx entry) { GridCacheContext ctx = entry.context(); - CacheMapHolder hld = entriesMap(ctx.cacheIdBoxed(), false); + CacheMapHolder hld = entriesMapIfExists(ctx.cacheIdBoxed()); boolean rmv = hld != null && hld.map.remove(entry.key(), entry); @@ -281,7 +295,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Override public Collection<GridCacheMapEntry> entries(int cacheId, final CacheEntryPredicate... filter) { - CacheMapHolder hld = entriesMap(cacheId, false); + CacheMapHolder hld = entriesMapIfExists(cacheId); if (hld == null) return Collections.emptyList(); @@ -297,7 +311,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Override public Set<GridCacheMapEntry> entrySet(int cacheId, final CacheEntryPredicate... filter) { - final CacheMapHolder hld = entriesMap(cacheId, false); + final CacheMapHolder hld = entriesMapIfExists(cacheId); if (hld == null) return Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java index ea1c3eb..63cfe1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheLocalConcurrentMap.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; +import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; /** @@ -31,15 +32,16 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { private final CacheMapHolder entryMap; /** - * @param cacheId Cache ID. + * @param cctx Cache context. * @param factory Entry factory. * @param initCap Initial capacity. */ - public GridCacheLocalConcurrentMap(int cacheId, GridCacheMapEntryFactory factory, int initCap) { + public GridCacheLocalConcurrentMap(GridCacheContext cctx, GridCacheMapEntryFactory factory, int initCap) { super(factory); - this.cacheId = cacheId; - this.entryMap = new CacheMapHolder(new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2)); + this.cacheId = cctx.cacheId(); + this.entryMap = new CacheMapHolder(cctx, + new ConcurrentHashMap8<KeyCacheObject, GridCacheMapEntry>(initCap, 0.75f, Runtime.getRuntime().availableProcessors() * 2)); } /** {@inheritDoc} */ @@ -48,9 +50,12 @@ public class GridCacheLocalConcurrentMap extends GridCacheConcurrentMapImpl { } /** {@inheritDoc} */ - @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) { - assert this.cacheId == cacheId; + @Nullable @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { + return entryMap; + } + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { return entryMap; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 27c27e4..cba9477 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -190,7 +190,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements cacheMaps = new ConcurrentHashMap<>(); } else { - singleCacheEntryMap = new CacheMapHolder(createEntriesMap()); + singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap()); cacheMaps = null; } @@ -240,26 +240,31 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** {@inheritDoc} */ - @Override protected CacheMapHolder entriesMap(Integer cacheId, boolean create) { + @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { if (grp.sharedGroup()) - return create ? cacheMapHolder(cacheId) : cacheMaps.get(cacheId); + return cacheMapHolder(cctx); return singleCacheEntryMap; } + /** {@inheritDoc} */ + @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { + return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; + } + /** - * @param cacheId Cache ID. + * @param cctx Cache context. * @return Map holder. */ - private CacheMapHolder cacheMapHolder(Integer cacheId) { + private CacheMapHolder cacheMapHolder(GridCacheContext cctx) { assert grp.sharedGroup(); - CacheMapHolder hld = cacheMaps.get(cacheId); + CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed()); if (hld != null) return hld; - CacheMapHolder old = cacheMaps.putIfAbsent(cacheId, hld = new CacheMapHolder(createEntriesMap())); + CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap())); if (old != null) hld = old; @@ -411,7 +416,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param ver Version. */ private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { - CacheMapHolder hld = entriesMap(cacheId, false); + CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; @@ -966,7 +971,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements clear(singleCacheEntryMap.map, extras, rec); if (!grp.allowFastEviction()) { - GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext().dhtCache().context(); + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; try { GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); @@ -977,10 +982,18 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements try { CacheDataRow row = it0.next(); - if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != row.cacheId())) - cctx = ctx.cacheContext(row.cacheId()).dhtCache().context(); + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) { + hld = cacheMaps.get(row.cacheId()); + + if (hld == null) + continue; + } + + assert hld != null; - GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx, + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, grp.affinity().lastVersion(), row.key(), true, @@ -988,7 +1001,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { if (rec) { - cctx.events().addEvent(cached.partition(), + hld.cctx.events().addEvent(cached.partition(), cached.key(), ctx.localNodeId(), (IgniteUuid)null, @@ -1140,7 +1153,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { if (grp.sharedGroup()) { if (hld == null) - hld = cacheMapHolder(e.context().cacheIdBoxed()); + hld = cacheMapHolder(e.context()); hld.size.incrementAndGet(); } @@ -1157,7 +1170,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { if (grp.sharedGroup()) { if (hld == null) - hld = cacheMapHolder(e.context().cacheIdBoxed()); + hld = cacheMapHolder(e.context()); hld.size.decrementAndGet(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 5914ae5..5b53935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -93,7 +93,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Override public void start() throws IgniteCheckedException { if (map == null) { map = new GridCacheLocalConcurrentMap( - ctx.cacheId(), + ctx, entryFactory(), ctx.config().getNearConfiguration().getNearStartSize()); } @@ -129,7 +129,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda /** {@inheritDoc} */ @Override public void onReconnected() { map = new GridCacheLocalConcurrentMap( - ctx.cacheId(), + ctx, entryFactory(), ctx.config().getNearConfiguration().getNearStartSize()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4e51f6c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 6252f7d..69d1260 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -74,7 +74,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { if (map == null) - map = new GridCacheLocalConcurrentMap(ctx.cacheId(), entryFactory(), DFLT_START_CACHE_SIZE); + map = new GridCacheLocalConcurrentMap(ctx, entryFactory(), DFLT_START_CACHE_SIZE); } /** {@inheritDoc} */
