ignite-12163 remove completeSavingAllocatedIndex method, add test for fullPageIdTable
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e3e9714 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e3e9714 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e3e9714 Branch: refs/heads/ignite-5075-pds Commit: 2e3e97141502a3e4120d46ef314f8d3ac368e050 Parents: 79a510d Author: Dmitriy Govorukhin <[email protected]> Authored: Thu May 11 11:17:07 2017 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Thu May 11 11:17:07 2017 +0300 ---------------------------------------------------------------------- .../GridCacheDatabaseSharedManager.java | 53 ----- .../cache/database/pagemem/FullPageIdTable.java | 4 +- .../database/pagemem/FullPageIdTableTest.java | 96 +++++++++ .../pagemem/PageIdDistributionTest.java | 215 +++++++++++++++++++ 4 files changed, 313 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2e3e9714/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java index 4390582..ef318ad 100755 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java @@ -231,9 +231,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** */ private boolean stopping; - /** Page meta io. */ - private final PageMetaIO pageMetaIO = PageMetaIO.VERSIONS.latest(); - /** Checkpoint runner thread pool. */ private ExecutorService asyncRunner; @@ -2340,56 +2337,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** - * @param pageMem Page mem. - * @param cacheId Cache id. - * @param part Partition. - */ - public void completeSavingAllocatedIndex( - PageMemoryEx pageMem, IgniteWriteAheadLogManager wal, int cacheId, int part - ) throws IgniteCheckedException { - long pageId = getSuperPageId(pageMem, cacheId, part); - long page = pageMem.acquirePage(cacheId, pageId); - try { - long pageAddr = pageMem.writeLock(cacheId, pageId, page); - - boolean wasChanged = false; - - try { - assert PageIO.getPageId(pageAddr) != 0; - - int lastAllocatedIdx = pageMetaIO.getLastPageCount(pageAddr); - int candidateAllocatedIdx = pageMetaIO.getCandidatePageCount(pageAddr); - - if (lastAllocatedIdx != candidateAllocatedIdx) { - if (isWalDeltaRecordNeeded(pageMem, cacheId, pageId, page, wal, null)) - wal.log(new MetaPageUpdateLastAllocatedIndex(cacheId, pageId, candidateAllocatedIdx)); - - pageMetaIO.setLastPageCount(pageAddr, candidateAllocatedIdx); - - wasChanged = true; - } - } - finally { - pageMem.writeUnlock(cacheId, pageId, page, null, wasChanged); - } - } - finally { - pageMem.releasePage(cacheId, pageId, page); - } - } - - /** - * @param pageMem Page mem. - * @param cacheId Cache id. - * @param part Partition. - */ - private static long getSuperPageId(PageMemoryEx pageMem, int cacheId, int part) throws IgniteCheckedException { - return part == PageIdAllocator.INDEX_PARTITION ? - pageMem.metaPageId(cacheId) : - pageMem.partitionMetaPageId(cacheId, part); - } - - /** * */ private enum CheckpointEntryType { http://git-wip-us.apache.org/repos/asf/ignite/blob/2e3e9714/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/FullPageIdTable.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/FullPageIdTable.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/FullPageIdTable.java index 2c50998..ab2f86e 100644 --- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/FullPageIdTable.java +++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/FullPageIdTable.java @@ -423,7 +423,7 @@ public class FullPageIdTable { * @param tag Tag. * @return Distance scanned if the entry is found or negative distance scanned, if entry was not found. */ - int distanceFromIdeal(int cacheId, long pageId, int tag) { + public int distanceFromIdeal(int cacheId, long pageId, int tag) { int step = 1; int index = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; @@ -461,7 +461,7 @@ public class FullPageIdTable { * * @param visitor Visitor. */ - void visitAll(IgniteBiInClosure<FullPageId, Long> visitor) { + public void visitAll(IgniteBiInClosure<FullPageId, Long> visitor) { for (int i = 0; i < capacity; i++) { if (isValuePresentAt(i)) { long base = entryBase(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/2e3e9714/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/FullPageIdTableTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/FullPageIdTableTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/FullPageIdTableTest.java new file mode 100644 index 0000000..cae865e --- /dev/null +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/FullPageIdTableTest.java @@ -0,0 +1,96 @@ +package org.apache.ignite.cache.database.pagemem; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.apache.ignite.internal.mem.DirectMemoryRegion; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.processors.cache.database.pagemem.FullPageIdTable; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class FullPageIdTableTest extends GridCommonAbstractTest { + /** */ + private static final int CACHE_ID_RANGE = 10; + + /** */ + private static final int PAGE_ID_RANGE = 1000; + + /** + * @throws Exception if failed. + */ + public void testRandomOperations() throws Exception { + long mem = FullPageIdTable.requiredMemory(CACHE_ID_RANGE * PAGE_ID_RANGE); + + UnsafeMemoryProvider prov = new UnsafeMemoryProvider(new long[] {mem}); + prov.start(); + + try { + long seed = U.currentTimeMillis(); + + info("Seed: " + seed + "L; //"); + + Random rnd = new Random(seed); + + DirectMemoryRegion region = prov.memory().regions().get(0); + + FullPageIdTable tbl = new FullPageIdTable(region.address(), region.size(), true); + + Map<FullPageId, Long> check = new HashMap<>(); + + for (int i = 0; i < 10_000; i++) { + int cacheId = rnd.nextInt(CACHE_ID_RANGE) + 1; + int pageId = rnd.nextInt(PAGE_ID_RANGE); + + FullPageId fullId = new FullPageId(pageId, cacheId); + + boolean put = rnd.nextInt(3) != -1; + + if (put) { + long val = rnd.nextLong(); + + tbl.put(cacheId, pageId, val, 0); + check.put(fullId, val); + } + else { + tbl.remove(cacheId, pageId, 0); + check.remove(fullId); + } + + verifyLinear(tbl, check); + + if (i > 0 && i % 1000 == 0) + info("Done: " + i); + } + } + finally { + prov.stop(); + } + } + + /** + * @param tbl Table to check. + * @param check Expected mapping. + */ + private void verifyLinear(FullPageIdTable tbl, Map<FullPageId, Long> check) { + final Map<FullPageId, Long> collector = new HashMap<>(); + + tbl.visitAll(new CI2<FullPageId, Long>() { + @Override public void apply(FullPageId fullId, Long val) { + if (collector.put(fullId, val) != null) + throw new AssertionError("Duplicate full page ID mapping: " + fullId); + } + }); + + assertEquals("Size check failed", check.size(), collector.size()); + + for (Map.Entry<FullPageId, Long> entry : check.entrySet()) + assertEquals("Mapping comparison failed for key: " + entry.getKey(), + entry.getValue(), collector.get(entry.getKey())); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2e3e9714/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/PageIdDistributionTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/PageIdDistributionTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/PageIdDistributionTest.java new file mode 100644 index 0000000..6d5841b --- /dev/null +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/pagemem/PageIdDistributionTest.java @@ -0,0 +1,215 @@ +package org.apache.ignite.cache.database.pagemem; + +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.ignite.internal.mem.DirectMemoryRegion; +import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.processors.cache.database.pagemem.FullPageIdTable; +import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryImpl; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class PageIdDistributionTest extends GridCommonAbstractTest { + /** */ + private static final int[] CACHE_IDS = new int[] { + CU.cacheId("partitioned1"), + CU.cacheId("partitioned2"), + CU.cacheId("partitioned3"), + CU.cacheId("partitioned4"), + CU.cacheId("replicated1"), + CU.cacheId("replicated2"), + CU.cacheId("replicated3"), + CU.cacheId("replicated4"), + }; + + /** */ + private static final int PARTS = 1024; + + /** */ + private static final int PAGES = 10240; + + /** + * + */ + public void testDistributions() { + printPageIdDistribution( + CU.cacheId("partitioned"), 1024, 30_000, 32, 2.5f); + + printPageIdDistribution( + CU.cacheId("partitioned"), 1024, 30_000, 64, 2.5f); + + printPageIdDistribution( + CU.cacheId(null), 1024, 30_000, 32, 2.5f); + } + + /** + * @param cacheId Cache id. + * @param parts Parts. + * @param pagesPerPartition Pages per partition. + * @param segments Segments. + * @param capFactor Capacity factor. + */ + private void printPageIdDistribution( + int cacheId, + int parts, + int pagesPerPartition, + int segments, + float capFactor + ) { + int allIds = parts * pagesPerPartition; + + int perSegmentSize = allIds / segments; + int capacity = (int)(perSegmentSize * capFactor); + + info("Total ids: " + allIds); + + List<Map<Integer, Integer>> collisionsPerSegment = new ArrayList<>(segments); + + for (int i = 0; i < segments; i++) + collisionsPerSegment.add(new HashMap<Integer, Integer>(allIds / segments, 1.0f)); + + int[] numInSegment = new int[segments]; + + for (int p = 0; p < parts; p++) { + for (int i = 0; i < pagesPerPartition; i++) { + long pageId = PageIdUtils.pageId(p, (byte)0, i); + + int segment = PageMemoryImpl.segmentIndex(cacheId, pageId, segments); + + int idxInSegment = U.safeAbs(FullPageId.hashCode(cacheId, pageId)) % capacity; + + Map<Integer, Integer> idxCollisions = collisionsPerSegment.get(segment); + + Integer old = idxCollisions.get(idxInSegment); + idxCollisions.put(idxInSegment, old == null ? 1 : old + 1); + + numInSegment[segment]++; + } + } + + for (int i = 0; i < collisionsPerSegment.size(); i++) { + Map<Integer, Integer> idxCollisions = collisionsPerSegment.get(i); + + int distinctPositions = idxCollisions.size(); + + int totalCnt = 0; + int nonZero = 0; + + for (Map.Entry<Integer, Integer> collision : idxCollisions.entrySet()) { + if (collision.getValue() != null) { + totalCnt += collision.getValue(); + nonZero++; + } + } + + info(String.format("Segment stats [i=%d, total=%d, distinct=%d, spaceUsed=%d%%, avgItCnt=%.1f + ']", + i, numInSegment[i], distinctPositions, distinctPositions * 100 / numInSegment[i], + (float)totalCnt / nonZero)); + } + + info("=========================================================="); + } + + /** + * Uncomment and run this test manually to get data to plot histogram for per-element distance from ideal. + * You can use Octave to plot the histogram: + * <pre> + * all = csvread("histo.txt"); + * hist(all, 200) + * </pre> + * + * @throws Exception If failed. + */ + public void _testRealHistory() throws Exception { + int cap = CACHE_IDS.length * PARTS * PAGES; + + info("Capacity: " + cap); + + long mem = FullPageIdTable.requiredMemory(cap); + + info(U.readableSize(mem, true)); + + UnsafeMemoryProvider prov = new UnsafeMemoryProvider(new long[] {mem}); + prov.start(); + + try { + long seed = U.currentTimeMillis(); + + info("Seed: " + seed + "L; //"); + + Random rnd = new Random(seed); + + DirectMemoryRegion region = prov.memory().regions().get(0); + + FullPageIdTable tbl = new FullPageIdTable(region.address(), region.size(), true); + + Map<T2<Integer, Integer>, Integer> allocated = new HashMap<>(); + + for (int i = 0; i < cap; i++) { + int cacheId = CACHE_IDS[rnd.nextInt(CACHE_IDS.length)]; + int partId = rnd.nextInt(PARTS); + + T2<Integer, Integer> key = new T2<>(cacheId, partId); + + Integer pageIdx = allocated.get(key); + + pageIdx = pageIdx == null ? 1 : pageIdx + 1; + + if (pageIdx > PAGES) + continue; + + tbl.put(cacheId, PageIdUtils.pageId(partId, (byte)0, pageIdx), 1, 0); + + allocated.put(key, pageIdx); + + if (i > 0 && i % 100_000 == 0) + info("Done: " + i); + } + + int[] scans = new int[cap]; + + int cur = 0; + + for (T2<Integer, Integer> key : allocated.keySet()) { + Integer alloc = allocated.get(key); + + if (alloc != null) { + for (int idx = 1; idx <= alloc; idx++) { + scans[cur] = tbl.distanceFromIdeal(key.get1(), PageIdUtils.pageId(key.get2(), (byte)0, idx), 0); + + assert scans[cur] != -1; + + cur++; + } + } + } + + try (FileOutputStream out = new FileOutputStream("histo.txt")) { + PrintWriter w = new PrintWriter(new OutputStreamWriter(out)); + + for (int scan : scans) { + if (scan != 0) + w.println(scan); + } + + w.flush(); + } + } + finally { + prov.stop(); + } + } +}
