Repository: ignite Updated Branches: refs/heads/ignite-5075 542c2c906 -> 8e5ec813e
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8e5ec813 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8e5ec813 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8e5ec813 Branch: refs/heads/ignite-5075 Commit: 8e5ec813e7250c67ac94c1b0d9c6b8bfb740bdbb Parents: 542c2c9 Author: sboikov <[email protected]> Authored: Fri May 12 14:39:10 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 12 15:08:29 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 78 ++++++++++++++++---- .../processors/cache/GridCacheProcessor.java | 25 ++----- .../processors/cache/GridCacheTtlManager.java | 17 ++++- .../cache/IgniteCacheOffheapManager.java | 8 +- .../cache/IgniteCacheOffheapManagerImpl.java | 47 ++++++------ .../distributed/dht/GridDhtLocalPartition.java | 13 +--- .../dht/preloader/GridDhtPartitionDemander.java | 2 +- 7 files changed, 120 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index ec33685..e638eb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -110,8 +112,7 @@ public class CacheGroupInfrastructure { private boolean needsRecovery; /** */ - private GridCacheContext singleCacheCtx; - + private final List<GridCacheContext> caches; /** * @param grpId Group ID. @@ -152,6 +153,8 @@ public class CacheGroupInfrastructure { (sharedGroup() || memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED); log = ctx.kernalContext().log(getClass()); + + caches = new ArrayList<>(); } /** @@ -187,21 +190,44 @@ public class CacheGroupInfrastructure { } /** - * @param singleCacheCtx Cache context if group contains single cache. + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + addCacheContext(cctx); + + offheapMgr.onCacheStarted(cctx); + } + + /** + * @param cctx Cache context. + */ + private void addCacheContext(GridCacheContext cctx) { + assert sharedGroup() || caches.isEmpty(); + + boolean add = caches.add(cctx); + + assert add : cctx.name(); + } + + /** + * @param cctx Cache context. */ - public void cacheContext(GridCacheContext singleCacheCtx) { - assert !sharedGroup(); + private void removeCacheContext(GridCacheContext cctx) { + assert sharedGroup() || caches.size() == 1 : caches.size(); - this.singleCacheCtx = singleCacheCtx; + boolean rmv = caches.remove(cctx); + + assert rmv : cctx.name(); } /** * @return Cache context if group contains single cache. */ - public GridCacheContext cacheContext() { - assert !sharedGroup(); + public GridCacheContext singleCacheContext() { + assert !sharedGroup() && caches.size() == 1; - return singleCacheCtx; + return caches.get(0); } // TODO IGNITE-5075: need separate caches with/without queries? @@ -360,17 +386,20 @@ public class CacheGroupInfrastructure { * */ public void onKernalStop() { - preldr.onKernalStop(); + if (preldr != null) // null for LOCAL cache. + preldr.onKernalStop(); offheapMgr.onKernalStop(); } /** - * @param cacheId Cache ID. + * @param cctx Cache context. * @param destroy Destroy flag. */ - void stopCache(int cacheId, boolean destroy) { - offheapMgr.stopCache(cacheId, destroy); + void stopCache(GridCacheContext cctx, boolean destroy) { + offheapMgr.stopCache(cctx.cacheId(), destroy); + + removeCacheContext(cctx); } /** @@ -383,6 +412,29 @@ public class CacheGroupInfrastructure { } /** + * @return {@code True} if group contains caches. + */ + boolean hasCaches() { + return !caches.isEmpty(); + } + + /** + * @param part Partition ID. + */ + public void onPartitionEvicted(int part) { + for (int i = 0; i < caches.size(); i++) { + GridCacheContext cctx = caches.get(i); + + if (cctx.isDrEnabled()) + cctx.dr().partitionEvicted(part); + + cctx.continuousQueries().onPartitionEvicted(part); + + cctx.dataStructures().onPartitionEvicted(part); + } + } + + /** * @throws IgniteCheckedException If failed. */ public void start() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 776f2c8..3e9c6ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1253,7 +1253,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel, destroy); } - ctx.group().stopCache(ctx.cacheId(), destroy); + ctx.group().stopCache(ctx, destroy); ctx.kernalContext().continuous().onCacheStop(ctx); @@ -1888,9 +1888,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { affNode, true); - if (!grp.sharedGroup()) - grp.cacheContext(cacheCtx); - cacheCtx.dynamicDeploymentId(desc.deploymentId()); GridCacheAdapter cache = cacheCtx.cache(); @@ -1901,6 +1898,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cache, schema != null ? schema : new QuerySchema()); + grp.onCacheStarted(cacheCtx); + onKernalStart(cache); } @@ -2050,22 +2049,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheGroupInfrastructure grp = prepareCacheStop(req.request()); - if (grp != null) { - boolean stopGrp = true; - - if (!grp.sharedGroup()) { - for (GridCacheContext cctx : sharedCtx.cacheContexts()) { - if (cctx.group() == grp) { - stopGrp = false; - - break; - } - } - } - - if (stopGrp) - stopCacheGroup(grp.groupId()); - } + if (grp != null && !grp.hasCaches()) + stopCacheGroup(grp.groupId()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 614b3e3..ad6aa5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -38,7 +38,10 @@ import org.jsr166.LongAdder8; */ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** Entries pending removal. */ - private GridConcurrentSkipListSetEx pendingEntries; + private GridConcurrentSkipListSetEx pendingEntries; + + /** */ + private boolean eagerTtlEnabled; /** */ private final IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> expireC = @@ -79,11 +82,20 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { if (cleanupDisabled) return; + eagerTtlEnabled = true; + cctx.shared().ttl().register(this); pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration() != null) ? new GridConcurrentSkipListSetEx() : null; } + /** + * @return {@code True} if eager ttl is enabled for cache. + */ + boolean eagerTtlEnabled() { + return eagerTtlEnabled; + } + /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { if (pendingEntries != null) @@ -153,7 +165,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { try { if (pendingEntries != null) { - //todo may be not only for near? may be for local too. GridNearCacheAdapter nearCache = cctx.near(); GridCacheVersion obsoleteVer = null; @@ -178,7 +189,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { } } - boolean more = cctx.offheap().expire(expireC, amount); + boolean more = cctx.offheap().expire(cctx, expireC, amount); if (more) return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 9e1729b..95556f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -47,6 +47,12 @@ public interface IgniteCacheOffheapManager { public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException;; /** + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException; + + /** * */ public void onKernalStop(); @@ -134,7 +140,7 @@ public interface IgniteCacheOffheapManager { * @param c Closure. * @throws IgniteCheckedException If failed. */ - public boolean expire(IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws IgniteCheckedException; + public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) throws IgniteCheckedException; /** * Gets the number of entries pending expire. http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index eb2b1b5..e037572 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -29,10 +29,8 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -52,10 +50,8 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; @@ -90,7 +86,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager protected CacheGroupInfrastructure grp; /** */ - private IgniteLogger log; + protected IgniteLogger log; /** */ // TODO GG-11208 need restore size after restart. @@ -148,11 +144,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } - /** - * @throws IgniteCheckedException If failed. - */ - protected void initDataStructures() throws IgniteCheckedException { - if (ctx.ttl().eagerTtlEnabled()) { + /** {@inheritDoc} */ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException{ + if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { String name = "PendingEntries"; long rootPage = allocateForTree(); @@ -167,6 +161,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } + /** + * @throws IgniteCheckedException If failed. + */ + protected void initDataStructures() throws IgniteCheckedException { + // No-op. + } + /** {@inheritDoc} */ @Override public void stopCache(int cacheId, final boolean destroy) { if (destroy && grp.affinityNode()) @@ -819,9 +820,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public boolean expire( + GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount ) throws IgniteCheckedException { + // TODO IGNITE-5075 filter by cache ID if needed. if (hasPendingEntries && pendingEntries != null) { GridCacheVersion obsoleteVer = null; @@ -836,18 +839,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (amount != -1 && cleared > amount) return true; -// TODO: IGNITE-5075. -// if (row.key.partition() == -1) -// row.key.partition(cctx.affinity().partition(row.key)); -// -// assert row.key != null && row.link != 0 && row.expireTime != 0 : row; -// -// if (pendingEntries.remove(row) != null) { -// if (obsoleteVer == null) -// obsoleteVer = ctx.versions().next(); -// -// c.apply(cctx.cache().entryEx(row.key), obsoleteVer); -// } + + if (row.key.partition() == -1) + row.key.partition(cctx.affinity().partition(row.key)); + + assert row.key != null && row.link != 0 && row.expireTime != 0 : row; + + if (pendingEntries.remove(row) != null) { + if (obsoleteVer == null) + obsoleteVer = ctx.versions().next(); + + c.apply(cctx.cache().entryEx(row.key), obsoleteVer); + } cleared++; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index dc1ad31..3119b52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -163,8 +163,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } }; - // TODO IGNITE-5075. - int delQueueSize = CU.isSystemCache(grp.name()) ? 100 : + int delQueueSize = CU.isSystemCache(grp.config().getName()) ? 100 : Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); rmvQueueMaxSize = U.ceilPow2(delQueueSize); @@ -726,13 +725,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements assert state() == EVICTED : this; assert evictGuard.get() == -1; -// TODO IGNITE-5075. -// if (cctx.isDrEnabled()) -// cctx.dr().partitionEvicted(id); -// -// cctx.continuousQueries().onPartitionEvicted(id); -// -// cctx.dataStructures().onPartitionEvicted(id); + grp.onPartitionEvicted(id); destroyCacheDataStore(); @@ -921,7 +914,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } if (!grp.allowFastEviction()) { - GridCacheContext cctx = grp.sharedGroup() ? null : grp.cacheContext(); + GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); try { GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); http://git-wip-us.apache.org/repos/asf/ignite/blob/8e5ec813/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 8a3b1de..1a9eb68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -751,7 +751,7 @@ public class GridDhtPartitionDemander { GridCacheEntryEx cached = null; try { - GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.cacheContext(); + GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext(); cached = cctx.dht().entryEx(entry.key());
