Repository: ignite Updated Branches: refs/heads/ignite-5075-pds 9d5c677d1 -> aaf9f45b2
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aaf9f45b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aaf9f45b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aaf9f45b Branch: refs/heads/ignite-5075-pds Commit: aaf9f45b2d9b9a9c62e1259ffb690b32915658c8 Parents: 9d5c677 Author: sboikov <[email protected]> Authored: Tue May 23 13:44:05 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 23 13:44:05 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManager.java | 9 +++- .../cache/IgniteCacheOffheapManagerImpl.java | 22 ++++++++- .../distributed/dht/GridDhtLocalPartition.java | 5 +- .../GridCacheDatabaseSharedManager.java | 7 ++- .../cache/database/GridCacheOffheapManager.java | 48 ++++++++++++++------ .../IgnitePersistentStoreCacheGroupsTest.java | 4 ++ .../ignite/testsuites/IgnitePdsTestSuite.java | 3 ++ 7 files changed, 75 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 6c5d8a0..65ba4b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.Map; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -373,8 +374,9 @@ public interface IgniteCacheOffheapManager { /** * @param size Size to init. * @param updCntr Update counter to init. + * @param cacheSizes Cache sizes if store belongs to group containing multiple caches. */ - void init(long size, long updCntr); + void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes); /** * @param cacheId Cache ID. @@ -383,6 +385,11 @@ public interface IgniteCacheOffheapManager { int cacheSize(int cacheId); /** + * @return Cache sizes if store belongs to group containing multiple caches. + */ + Map<Integer, Long> cacheSizes(); + + /** * @return Total size. */ int fullSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index a864b46..50e01e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -1022,6 +1024,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ + @Override public Map<Integer, Long> cacheSizes() { + if (!grp.sharedGroup()) + return null; + + Map<Integer, Long> res = new HashMap<>(); + + for (Map.Entry<Integer, AtomicLong> e : cacheSizes.entrySet()) + res.put(e.getKey(), e.getValue().longValue()); + + return res; + } + + /** {@inheritDoc} */ @Override public int fullSize() { return (int)storageSize.get(); } @@ -1491,10 +1506,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void init(long size, long updCntr) { + @Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) { initCntr = updCntr; storageSize.set(size); cntr.set(updCntr); + + if (cacheSizes != null) { + for (Map.Entry<Integer, Long> e : cacheSizes.entrySet()) + this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue())); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index bcee461..07c6136 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -56,12 +56,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; @@ -614,11 +612,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (update) try { - // TODO IGNITE-5075. ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter())); } catch (IgniteCheckedException e) { - log.error("Error while writing to log", e); + U.error(log, "Error while writing to log", e); } return update; http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index 624a1c0..6635e2d 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -665,7 +665,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (cctx.kernalContext().query().moduleEnabled()) { for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) { if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) && - !cctx.pageStore().hasIndexStore(cacheCtx.cacheId()) && cacheCtx.affinityNode()) { + !cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) { final int cacheId = cacheCtx.cacheId(); final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query() @@ -849,7 +849,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @throws IgniteCheckedException If failed to restore database status from WAL. */ - public void restoreState() throws IgniteCheckedException { + private void restoreState() throws IgniteCheckedException { try { CheckpointStatus status = readCheckpointStatus(); @@ -1365,7 +1365,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor. */ private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException { - // TODO IGNITE-5075: group ID should not change. // TODO IGNITE-5075: cache descriptor can be removed. GridCacheSharedContext sharedCtx = context(); @@ -1375,7 +1374,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan .config() .getMemoryPolicyName(); - return (PageMemoryEx) sharedCtx.database().memoryPolicy(memPlcName).pageMemory(); + return (PageMemoryEx)sharedCtx.database().memoryPolicy(memPlcName).pageMemory(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java index 0be5652..eacf987 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java @@ -179,10 +179,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple freeList.saveMetadata(); - // TODO IGNITE-5075. + // TODO IGNITE-5075: save cache sizes. long updCntr = store.updateCounter(); int size = store.fullSize(); long rmvId = globalRemoveId().get(); + Map<Integer, Long> cacheSizes = grp.sharedGroup() ? store.cacheSizes() : null; PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory(); IgniteWriteAheadLogManager wal = this.ctx.wal(); @@ -221,11 +222,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple io.setPartitionState(pageAddr, (byte)state); - int pageCount; + int pageCnt; if (beforeSnapshot) { - pageCount = this.ctx.pageStore().pages(grpId, store.partId()); - io.setCandidatePageCount(pageAddr, pageCount); + pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); + io.setCandidatePageCount(pageAddr, pageCnt); if (saveMeta) { long metaPageId = pageMem.metaPageId(grpId); @@ -265,7 +266,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple this.ctx.pageStore().pages(grpId, store.partId())); } else - pageCount = io.getCandidatePageCount(pageAddr); + pageCnt = io.getCandidatePageCount(pageAddr); if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) wal.log(new MetaPageUpdatePartitionDataRecord( @@ -275,7 +276,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple rmvId, size, (byte)state, - pageCount + pageCnt )); } finally { @@ -804,6 +805,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple int grpId = grp.groupId(); long partMetaId = pageMem.partitionMetaPageId(grpId, partId); long partMetaPage = pageMem.acquirePage(grpId, partMetaId); + try { long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); @@ -811,7 +813,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (PageIO.getType(pageAddr) != 0) { PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); - delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr)); + // TODO IGNITE-5075. + delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), null); globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); } @@ -945,6 +948,30 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ + @Override public int cacheSize(int cacheId) { + try { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? 0 : delegate0.cacheSize(cacheId); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<Integer, Long> cacheSizes() { + try { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? null : delegate0.cacheSizes(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ @Override public long updateCounter() { try { CacheDataStore delegate0 = init0(true); @@ -957,7 +984,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public void init(long size, long updCntr) { + @Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) { throw new IllegalStateException("Should be never called."); } @@ -1114,11 +1141,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } /** {@inheritDoc} */ - @Override public int cacheSize(int cacheId) { - return 0; - } - - /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { CacheDataStore delegate = init0(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java index b184375..8796fb0 100644 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java @@ -118,6 +118,8 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest assertEquals(cacheName + i, cache.get(i)); } + + assertEquals(10, cache.size()); } stopGrid(0); @@ -129,6 +131,8 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (int i = 0; i < 10; i++) assertEquals(cacheName + i, cache.get(i)); + + assertEquals(10, cache.size()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/aaf9f45b/modules/pds/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/pds/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index c3e5a70..c781176 100644 --- a/modules/pds/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/pds/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.cache.database.IgnitePersistentStoreCacheGroupsTest; import org.apache.ignite.cache.database.IgnitePersistentStoreClientNearCachePutGetWithPersistenceSelfTest; import org.apache.ignite.cache.database.IgnitePersistentStoreDynamicCacheTest; import org.apache.ignite.cache.database.IgnitePersistentStoreSingleNodePutGetPersistenceSelfTest; @@ -85,6 +86,8 @@ public class IgnitePdsTestSuite extends TestSuite { suite.addTestSuite(IgniteWalDirectoriesConfigurationTest.class); suite.addTestSuite(IgnitePersistentStoreClientNearCachePutGetWithPersistenceSelfTest.class); + suite.addTestSuite(IgnitePersistentStoreCacheGroupsTest.class); + return suite; } }
