Repository: ignite Updated Branches: refs/heads/master ac3d54504 -> 1f8ec79d2
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMap.java new file mode 100644 index 0000000..fd4e0e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMap.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.function.BiConsumer; +import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR; +import static org.apache.ignite.IgniteSystemProperties.getFloat; +import static org.apache.ignite.internal.util.GridUnsafe.getInt; +import static org.apache.ignite.internal.util.GridUnsafe.getLong; +import static org.apache.ignite.internal.util.GridUnsafe.putInt; +import static org.apache.ignite.internal.util.GridUnsafe.putLong; + +/** + * Loaded pages mapping to relative pointer based on Robin Hood hashing: backward shift deletion algorithm. <br> + * Performance of initial Robin Hood hashing could be greatly improved with only a little change to the removal + * method.<br> Instead of replacing entries with 'Removed' fake entries on deletion, backward shift deletion variant + * for the Robin Hood hashing algorithm does shift backward all the entries following the entry to delete until + * either an empty bucket, or a bucket with a DIB of 0 (distance to initial bucket).<br> + * + * Every deletion will shift backwards entries and therefore decrease their respective DIBs by 1 + * (all their initial DIB values would be >= 1)<br> + * + * This implementation stores ideal bucket with entry value itself.<br> + * + */ +public class RobinHoodBackwardShiftHashMap implements LoadedPagesMap { + /** Load factor. */ + private static final float LOAD_FACTOR = getFloat(IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR, 2.5f); + + /** Size of count of entries (value returned by size() method). */ + private static final int MAPSIZE_SIZE = 4; + + /** Padding to provide read/write from word beginning for each cell. Change this to 0 if padding is not required. */ + private static final int CELL_PADDING = 4; + + /** Padding to provide read/write from word beginning. Change this to 0 if padding is not required. */ + private static final int MAPSIZE_PADDING = 4; + + /** Count of entries offset starting from base address. */ + private static final int MAPSIZE_OFFSET = 0; + + /** Size of initial/ideal bucket (cell to store value to avoid probing other cells followed). */ + private static final int IDEAL_BUCKET_SIZE = 4; + + /** Offset of initial/ideal bucket starting from entry base. */ + private static final int IDEAL_BUCKET_OFFSET = 0; + + /** Group ID size. */ + private static final int GRP_ID_SIZE = 4; + + /** Group ID offset from entry base. */ + private static final int GRP_ID_OFFSET = IDEAL_BUCKET_OFFSET + IDEAL_BUCKET_SIZE; + + /** Page ID size. */ + private static final int PAGE_ID_SIZE = 8; + + /** Page ID offset from entry base. */ + private static final int PAGE_ID_OFFSET = GRP_ID_OFFSET + GRP_ID_SIZE; + + /** Value size. */ + private static final int VALUE_SIZE = 8; + + /** Value offset from entry base. */ + private static final int VALUE_OFFSET = PAGE_ID_OFFSET + PAGE_ID_SIZE; + + /** Version (tag/generation) offset from entry base. */ + private static final int VERSION_SIZE = 4; + + /** Version (tag/generation) offset from entry base. */ + private static final int VERSION_OFFSET = VALUE_OFFSET + VALUE_SIZE; + + /** Page ID used for empty bucket. */ + private static final long EMPTY_PAGE_ID = 0; + + /** Cache Group ID used for empty bucket. */ + private static final int EMPTY_CACHE_GRP_ID = 0; + + /** Bytes required for storing one entry (cell). */ + private static final int BYTES_PER_CELL = IDEAL_BUCKET_SIZE + + GRP_ID_SIZE + PAGE_ID_SIZE + + VALUE_SIZE + VERSION_SIZE + + CELL_PADDING; + + /** Number of buckets, indicates range of scan memory, max probe count and maximum map size. */ + private final int numBuckets; + + /** Base address of map content. */ + private long baseAddr; + + /** + * @param elementsCnt Maximum elements can be stored in map, its maximum size. + * @return Estimated memory size required for this map to store the given number of elements. + */ + public static long requiredMemory(long elementsCnt) { + float loadFactor = LOAD_FACTOR; + + assert loadFactor != 0; + + return requiredMemoryByBuckets((long)(elementsCnt * loadFactor)); + } + + /** + * @param numBuckets Number of buckets (cells) to store, capacity. + * @return required size to allocate, based on number of buckets (cells) to store in map, its capacity. + */ + static long requiredMemoryByBuckets(long numBuckets) { + return numBuckets * BYTES_PER_CELL + MAPSIZE_SIZE + MAPSIZE_PADDING; + } + + /** + * Creates map in preallocated unsafe memory segment. + * + * @param baseAddr Base buffer address. + * @param size Size available for map, number of buckets (cells) to store will be determined accordingly. + */ + public RobinHoodBackwardShiftHashMap(long baseAddr, long size) { + this.numBuckets = (int)((size - MAPSIZE_SIZE - MAPSIZE_PADDING) / BYTES_PER_CELL); + this.baseAddr = baseAddr; + + GridUnsafe.setMemory(baseAddr, size, (byte)0); + } + + /** + * @param idx cell index. + * @return base cell (bucket) address in buffer. + */ + private long entryBase(int idx) { + assert idx >= 0 && idx < numBuckets; + + return baseAddr + MAPSIZE_SIZE + MAPSIZE_PADDING + (long)idx * BYTES_PER_CELL; + } + + /** {@inheritDoc} */ + @Override public long get(int grpId, long pageId, int reqVer, long absent, long outdated) { + assert grpId != EMPTY_CACHE_GRP_ID; + + // initial index is also ideal for searhed element + int idxInit = U.safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets; + + for (int i = 0; i < numBuckets; i++) { + int idxCurr = (idxInit + i) % numBuckets; + + final long base = entryBase(idxCurr); + final int distanceFromInit = distance(idxCurr, idxInit); + + final int curGrpId = getGrpId(base); + final long curPageId = getPageId(base); + + final int dibCurEntry = distance(idxCurr, getIdealBucket(base)); + + if (isEmpty(curGrpId, curPageId)) + return absent; + else if (curGrpId == grpId && curPageId == pageId) { + //equal value found + long actualVer = getVersion(base); + boolean freshVal = actualVer >= reqVer; + + return freshVal ? getValue(base) : outdated; + } + else if (dibCurEntry < distanceFromInit) { + //current entry has quite good position, it would be swapped at hypothetical insert of current value + return absent; + } + } + + return absent; + } + + /** {@inheritDoc} */ + @Override public void put(int grpId, long pageId, long val, int ver) { + assert grpId != 0; + + // initial index is also ideal for inserted element + int idxInit = U.safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets; + + int swapCnt = 0; + + int grpIdToInsert = grpId; + long pageIdToInsert = pageId; + long valToInsert = val; + int verToInsert = ver; + long idxIdealToInsert = idxInit; + + for (int i = 0; i < numBuckets; i++) { + int idxCurr = (idxInit + i) % numBuckets; + + final long base = entryBase(idxCurr); + final int dibEntryToInsert = distance(idxCurr, idxInit); + + final int curGrpId = getGrpId(base); + final long curPageId = getPageId(base); + final int curIdealBucket = getIdealBucket(base); + final long curVal = getValue(base); + final int curVer = getVersion(base); + final int dibCurEntry = distance(idxCurr, curIdealBucket); + + if (isEmpty(curGrpId, curPageId)) { + setCellValue(base, idxIdealToInsert, grpIdToInsert, pageIdToInsert, valToInsert, verToInsert); + + setSize(size() + 1); + + return; + } + else if (curGrpId == grpIdToInsert && curPageId == pageIdToInsert) { + if (swapCnt != 0) + throw new IllegalStateException("Swapped " + swapCnt + " times. Entry: " + dumpEntry(idxCurr)); + + setValue(base, valToInsert); + + return; //equal value found + } + else if (dibCurEntry < dibEntryToInsert) { + //swapping *toInsert and state in bucket: save cur state to bucket + setCellValue(base, idxIdealToInsert, grpIdToInsert, pageIdToInsert, valToInsert, verToInsert); + + idxIdealToInsert = curIdealBucket; + pageIdToInsert = curPageId; + grpIdToInsert = curGrpId; + valToInsert = curVal; + verToInsert = curVer; + + swapCnt++; + } + } + + // no free space left + throw new IgniteOutOfMemoryException("No room for a new key"); + } + + /** {@inheritDoc} */ + @Override public boolean remove(int grpId, long pageId) { + assert grpId != EMPTY_CACHE_GRP_ID; + + int idxInit = U.safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets; + + int idxEqualValFound = -1; + for (int i = 0; i < numBuckets; i++) { + int idxCurr = (idxInit + i) % numBuckets; + + final long base = entryBase(idxCurr); + final int dibEntryToInsert = distance(idxCurr, idxInit); + + final int curGrpId = getGrpId(base); + final long curPageId = getPageId(base); + final int curIdealBucket = getIdealBucket(base); + final int dibCurEntry = distance(idxCurr, curIdealBucket); + + if (isEmpty(curGrpId, curPageId)) + return false; + else if (curGrpId == grpId && curPageId == pageId) { + idxEqualValFound = idxCurr; + + break; //equal value found + } + else if (dibCurEntry < dibEntryToInsert) { + //If our value was present in map we had already found it. + return false; + } + } + + setSize(size() - 1); + + doBackwardShift(idxEqualValFound); + + return true; + } + + /** + * Runs backward shifts from current index to . + * + * @param idxRmv removed index. + */ + private void doBackwardShift(int idxRmv) { + assert idxRmv >= 0; + + //scanning rest of map to perform backward shifts + for (int i = 0; i < numBuckets - 1; i++) { + int idxCurr = (idxRmv + i) % numBuckets; + int idxNext = (idxRmv + i + 1) % numBuckets; + + long baseCurr = entryBase(idxCurr); + + long baseNext = entryBase(idxNext); + final int nextGrpId = getGrpId(baseNext); + final long nextPageId = getPageId(baseNext); + final int nextIdealBucket = getIdealBucket(baseNext); + final int nextEntryVer = getVersion(baseNext); + + if (isEmpty(nextGrpId, nextPageId) + || distance(idxNext, nextIdealBucket) == 0) { + setEmpty(baseCurr); + + return; + } + else + setCellValue(baseCurr, nextIdealBucket, nextGrpId, nextPageId, getValue(baseNext), nextEntryVer); + } + + int lastShiftedIdx = (idxRmv - 1) % numBuckets; + + setEmpty(entryBase(lastShiftedIdx)); + } + + /** {@inheritDoc} */ + @Override public ReplaceCandidate getNearestAt(final int idxStart) { + for (int i = 0; i < numBuckets; i++) { + int idxCurr = (idxStart + i) % numBuckets; + + if (isEmptyAt(idxCurr)) + continue; + + long base = entryBase(idxCurr); + + return new ReplaceCandidate(getVersion(base), getValue(base), getFullPageId(base)); + } + + return null; + } + + /** + * @param idx Index to test. + * @return {@code True} if value is not provided in cell having index. + */ + private boolean isEmptyAt(int idx) { + long base = entryBase(idx); + + return isEmpty(getGrpId(base), getPageId(base)); + } + + /** {@inheritDoc} */ + @Override public long refresh(int grpId, long pageId, int ver) { + assert grpId != EMPTY_CACHE_GRP_ID; + + int idxInit = U.safeAbs(FullPageId.hashCode(grpId, pageId)) % numBuckets; + + for (int i = 0; i < numBuckets; i++) { + int idxCurr = (idxInit + i) % numBuckets; + + final long base = entryBase(idxCurr); + final int distanceFromInit = distance(idxCurr, idxInit); + + final int curGrpId = getGrpId(base); + final long curPageId = getPageId(base); + final int curIdealBucket = getIdealBucket(base); + final int dibCurEntry = distance(idxCurr, curIdealBucket); + + if (isEmpty(curGrpId, curPageId)) + break; // break to fail + else if (curGrpId == grpId && curPageId == pageId) { + //equal value found + long actualVer = getVersion(base); + + boolean freshVal = actualVer >= ver; + + if (freshVal) { + throw new IllegalArgumentException("Fresh element found at " + + dumpEntry(idxCurr) + " during search of cell to refresh. " + + "Refresh should be called for existent outdated element. "); + } + + setVersion(base, ver); + + return getValue(base); + } + else if (dibCurEntry < distanceFromInit) { + //current entry has quite good position, it would be swapped at hypothetical insert of current value + + break; + } + } + + throw new IllegalArgumentException("Element not found group ID: " + grpId + ", page ID: " + pageId + + " during search of cell to refresh. Refresh should be called for existent outdated element. "); + } + + /** {@inheritDoc} */ + @Override public GridLongList removeIf(int startIdxToClear, int endIdxToClear, KeyPredicate keyPred) { + assert endIdxToClear >= startIdxToClear + : "Start and end indexes are not consistent: {" + startIdxToClear + ", " + endIdxToClear + "}"; + + int sz = endIdxToClear - startIdxToClear; + + GridLongList list = new GridLongList(sz); + + for (int idx = startIdxToClear; idx < endIdxToClear; idx++) { + long base = entryBase(idx); + + int grpId = getGrpId(base); + long pageId = getPageId(base); + + if (isEmpty(grpId, pageId)) + continue; // absent value, no removal required + + if (!keyPred.test(grpId, pageId)) + continue; // not matched value, no removal required + + long valAt = getValue(base); + + setSize(size() - 1); + + doBackwardShift(idx); + + list.add(valAt); + + idx--; //Need recheck current cell because of backward shift + } + + return list; + } + + /** {@inheritDoc} */ + @Override public int capacity() { + return numBuckets; + } + + /** + * @param curr current selected index to store value. + * @param baseIdx base or ideal bucket to store entry value to avoid probing. + * @return distance between cells, or 0 if cell is ideal. + */ + private int distance(int curr, int baseIdx) { + int diff = curr - baseIdx; + + if (diff < 0) + return diff + numBuckets; + + return diff; + } + + /** + * @param grpId Cache group ID. + * @param pageId Page ID. + * @return {@code true} if group & page id indicates cell has state 'Empty'. + */ + private boolean isEmpty(int grpId, long pageId) { + return pageId == EMPTY_PAGE_ID && grpId == EMPTY_CACHE_GRP_ID; + } + + /** + * Sets cell value to be empty. + * + * @param addr entry base address. + */ + private void setEmpty(long addr) { + setPageId(addr, EMPTY_PAGE_ID); + setGrpId(addr, EMPTY_CACHE_GRP_ID); + setValue(addr, 0); + setIdealBucket(addr, 0); + setVersion(addr, 0); + } + + /** + * @param base Entry base, address in buffer of the entry start. + * @param idxIdeal number of ideal bucket (cell) to insert this value. + */ + private void setIdealBucket(long base, long idxIdeal) { + assert idxIdeal >= 0 && idxIdeal < numBuckets; + + putLong(base + IDEAL_BUCKET_OFFSET, idxIdeal); + } + + /** + * @return printable dump with all buckets state. + */ + public String dump() { + StringBuilder sb = new StringBuilder(); + + for (int idx = 0; idx < numBuckets; idx++) + dumpEntry(sb, idx); + + return sb.toString(); + } + + /** + * @param idx index of entry to dump + * @return string representation of bucket content. + */ + private String dumpEntry(int idx) { + StringBuilder sb = new StringBuilder(); + + dumpEntry(sb, idx); + + return sb.toString(); + } + + /** + * @param sb destination string builder to dump entry to. + * @param idx bucket index. + */ + private void dumpEntry(StringBuilder sb, int idx) { + long base = entryBase(idx); + int curGrpId = getGrpId(base); + long curPageId = getPageId(base); + long curVal = getValue(base); + long ver = getVersion(base); + + sb.append("slot [").append(idx).append("]:"); + + if (isEmpty(curGrpId, curPageId)) + sb.append("Empty: "); + + sb.append("i.buc=").append(getIdealBucket(base)).append(","); + sb.append("(grp=").append(curGrpId).append(","); + sb.append("page=").append(curPageId).append(")"); + sb.append("->"); + sb.append("(val=").append(curVal).append(","); + sb.append("ver=").append(ver).append(")"); + sb.append("\n"); + } + + /** + * @param base Entry base, address in buffer of the entry start. + * @param idealBucket number of ideal bucket (cell) to insert this value. + * @param grpId Entry key. Group ID to be stored in entry. + * @param pageId Entry key. Page ID to be stored. + * @param val Entry value associated with key. + * @param ver Entry version. + */ + private void setCellValue(long base, long idealBucket, int grpId, long pageId, long val, int ver) { + setIdealBucket(base, idealBucket); + setGrpId(base, grpId); + setPageId(base, pageId); + setValue(base, val); + setVersion(base, ver); + } + + /** + * @param base address of current cell. + * @return number of ideal bucket (cell) to store this value. + */ + private int getIdealBucket(long base) { + return getInt(base + IDEAL_BUCKET_OFFSET); + } + + /** + * @param base Address of current cell. + * @return Page ID saved in cell. + */ + private long getPageId(long base) { + return getLong(base + PAGE_ID_OFFSET); + } + + /** + * @param base Address of cell. + * @param pageId Page ID to set in current cell. + */ + private void setPageId(long base, long pageId) { + putLong(base + PAGE_ID_OFFSET, pageId); + } + + /** + * @param base Entry base address. + * @return Group ID stored in entry. + */ + private int getGrpId(long base) { + return getInt(base + GRP_ID_OFFSET); + } + + /** + * @param base Entry base address. + * @param grpId Group ID to be stored in entry. + */ + private void setGrpId(long base, int grpId) { + putInt(base + GRP_ID_OFFSET, grpId); + } + + /** + * @param base Bucket base address. + * @return value stored in bucket. + */ + private long getValue(long base) { + return getLong(base + VALUE_OFFSET); + } + + /** + * @param base Bucket base address. + * @param val Value to store in bucket. + */ + private void setValue(long base, long val) { + putLong(base + VALUE_OFFSET, val); + } + + /** + * @param base Bucket base address. + * @return Entry version associated with bucket. + */ + private int getVersion(long base) { + return getInt(base + VERSION_OFFSET); + } + + /** + * @param base Bucket base address. + * @param ver Entry version to set in bucket. + */ + private void setVersion(long base, int ver) { + putInt(base + VERSION_OFFSET, ver); + } + + /** {@inheritDoc} */ + @Override public final int size() { + return GridUnsafe.getInt(baseAddr + MAPSIZE_OFFSET); + } + + /** + * Changes collection size. + * + * @param sz new size to set. + */ + private void setSize(int sz) { + putInt(baseAddr + MAPSIZE_OFFSET, sz); + } + + /** {@inheritDoc} */ + @Override public void forEach(BiConsumer<FullPageId, Long> act) { + for (int i = 0; i < numBuckets; i++) { + if (isEmptyAt(i)) + continue; + + long base = entryBase(i); + + act.accept(getFullPageId(base), getValue(base)); + } + } + + /** + * @param base bucket base address. + * @return Key. Full page ID from bucket. + */ + @NotNull private FullPageId getFullPageId(long base) { + return new FullPageId(getPageId(base), getGrpId(base)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java index 316cb8b..76cd5bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/checkpoint/IgniteMassLoadSandboxTest.java @@ -110,7 +110,7 @@ public class IgniteMassLoadSandboxTest extends GridCommonAbstractTest { DataRegionConfiguration regCfg = new DataRegionConfiguration() .setName("dfltMemPlc") .setMetricsEnabled(true) - .setMaxSize(256L * 1024 * 1024) + .setMaxSize(2 * 1024L * 1024 * 1024) .setPersistenceEnabled(true); DataStorageConfiguration dsCfg = new DataStorageConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTableTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTableTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTableTest.java index 1f29549..43b27aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTableTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/FullPageIdTableTest.java @@ -20,32 +20,45 @@ package org.apache.ignite.internal.processors.cache.persistence.pagemem; import java.util.HashMap; import java.util.Map; import java.util.Random; +import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.DirectMemoryRegion; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.processors.cache.persistence.pagemem.FullPageIdTable; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.logger.java.JavaLogger; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; /** * */ -public class FullPageIdTableTest extends GridCommonAbstractTest { +public class FullPageIdTableTest { + /** */ + private static final int CACHE_ID_RANGE = 1; + + /** */ + private static final int PAGE_ID_RANGE = 3000; + /** */ - private static final int CACHE_ID_RANGE = 10; + private static final int CACHE_ID_RANGE2 = 1024; /** */ - private static final int PAGE_ID_RANGE = 1000; + private static final int PAGE_ID_RANGE2 = 3000; + + /** Logger. */ + private JavaLogger log = new JavaLogger(); /** * @throws Exception if failed. */ + @Test public void testRandomOperations() throws Exception { - long mem = FullPageIdTable.requiredMemory(CACHE_ID_RANGE * PAGE_ID_RANGE); + int cnt = CACHE_ID_RANGE * PAGE_ID_RANGE; + + long mem = FullPageIdTable.requiredMemory(cnt); - UnsafeMemoryProvider prov = new UnsafeMemoryProvider(new JavaLogger()); + UnsafeMemoryProvider prov = new UnsafeMemoryProvider(log); prov.initialize(new long[] {mem}); @@ -58,7 +71,7 @@ public class FullPageIdTableTest extends GridCommonAbstractTest { Random rnd = new Random(seed); - FullPageIdTable tbl = new FullPageIdTable(region.address(), region.size(), true); + LoadedPagesMap tbl = new FullPageIdTable(region.address(), region.size(), true); Map<FullPageId, Long> check = new HashMap<>(); @@ -77,7 +90,7 @@ public class FullPageIdTableTest extends GridCommonAbstractTest { check.put(fullId, val); } else { - tbl.remove(cacheId, pageId, 0); + tbl.remove(cacheId, pageId); check.remove(fullId); } @@ -93,23 +106,163 @@ public class FullPageIdTableTest extends GridCommonAbstractTest { } /** + * @throws Exception if failed. + */ + @Test + public void putRemoveScenario() throws Exception { + long seed = U.currentTimeMillis(); + + doPutRemoveTest(seed, false, 1_000_000); + } + + /** + * @throws Exception if failed. + */ + @Test + public void putRemoveScenarioNewMap() throws Exception { + long seed = U.currentTimeMillis(); + doPutRemoveTest(seed, true, 30_000_000); + } + + /** + * @param seed random initial value. + * @param newMapImpl use RobinHood map + * @param iters iterations. + */ + private void doPutRemoveTest(long seed, boolean newMapImpl, int iters) { + int elementsCnt = 7000; + + //System.setProperty(IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR, "11"); + long mem = newMapImpl + ? RobinHoodBackwardShiftHashMap.requiredMemory(elementsCnt) + : FullPageIdTable.requiredMemory(elementsCnt); + + DirectMemoryProvider prov = new UnsafeMemoryProvider(log); + + prov.initialize(new long[] {mem}); + + DirectMemoryRegion region = prov.nextRegion(); + + try { + info("Seed: " + seed + "L; //"); + + Random rnd = new Random(seed); + + LoadedPagesMap tbl = + newMapImpl + ? new RobinHoodBackwardShiftHashMap(region.address(), region.size()) + : new FullPageIdTable(region.address(), region.size(), true); + + Map<FullPageId, Long> check = new HashMap<>(); + + int tag = 0; + for (int i = 0; i < iters; i++) { + int op = rnd.nextInt(5); + + int cacheId = rnd.nextInt(CACHE_ID_RANGE2) + 1; + int pageId = rnd.nextInt(PAGE_ID_RANGE2); + + FullPageId fullId = new FullPageId(pageId, cacheId); + + if (op == 0) { + long val = tbl.get(cacheId, pageId, tag, -1, -2); + if (val == -2) + tbl.refresh(cacheId, pageId, tag); + else { + Long checkVal = check.get(fullId); + + if (checkVal != null) { + assertEquals("Ret " + val + "Check " + checkVal, + checkVal.longValue(), val); + } + } + + } + else if ((op == 1 || op == 2) && (check.size() < elementsCnt)) { + long val = U.safeAbs(rnd.nextLong()); + + tbl.put(cacheId, pageId, val, tag); + check.put(fullId, val); + } + else if ((op == 3) && check.size() >= elementsCnt * 2 / 3) { + tbl.remove(cacheId, pageId); + check.remove(fullId); + } + else if (check.size() >= elementsCnt * 2 / 3) { + int idx = rnd.nextInt(tbl.capacity()); + ReplaceCandidate ec = tbl.getNearestAt(idx); + if (ec != null) { + FullPageId fullPageId = ec.fullId(); + + tbl.remove(fullPageId.groupId(), fullPageId.pageId()); + + check.remove(fullPageId); + } + } + + if (i > 0 && i % 100_000 == 0) { + info("Done: " + i + + " Size: " + check.size() + + " Capacity: " + tbl.capacity() ); + + verifyLinear(tbl, check); + + tag++; + } + + i++; + } + + verifyLinear(tbl, check); + } + finally { + long msPassed = U.currentTimeMillis() - seed; + System.err.println("Seed used [" + seed + "] duration ["+ msPassed+ "] ms"); + prov.shutdown(); + } + } + + /** + * @param msg Message to print. + */ + protected void info(String msg) { + if (log.isInfoEnabled()) + log.info(msg); + + System.out.println(msg); + } + + + /** * @param tbl Table to check. * @param check Expected mapping. */ - private void verifyLinear(FullPageIdTable tbl, Map<FullPageId, Long> check) { - final Map<FullPageId, Long> collector = new HashMap<>(); + private void verifyLinear(LoadedPagesMap tbl, Map<FullPageId, Long> check) { + final Map<FullPageId, Long> tblSnapshot = new HashMap<>(); - tbl.visitAll(new CI2<FullPageId, Long>() { - @Override public void apply(FullPageId fullId, Long val) { - if (collector.put(fullId, val) != null) - throw new AssertionError("Duplicate full page ID mapping: " + fullId); - } + tbl.forEach((fullId, val) -> { + if (tblSnapshot.put(fullId, val) != null) + throw new AssertionError("Duplicate full page ID mapping: " + fullId); }); - assertEquals("Size check failed", check.size(), collector.size()); + int chkSize = check.size(); + int foundTblSize = tblSnapshot.size(); - for (Map.Entry<FullPageId, Long> entry : check.entrySet()) - assertEquals("Mapping comparison failed for key: " + entry.getKey(), - entry.getValue(), collector.get(entry.getKey())); + HashMap<FullPageId, Long> cp = new HashMap<>(tblSnapshot); + check.keySet().forEach(cp::remove); + + assertEquals("Size check failed, check map size " + + chkSize + " but found in table " + foundTblSize + " elements," + + " table size " + tbl.size() + + "\n Difference: " + cp, chkSize, foundTblSize); + + for (Map.Entry<FullPageId, Long> entry : check.entrySet()) { + Long valCheck = entry.getValue(); + Long actual = tblSnapshot.get(entry.getKey()); + + if (!valCheck.equals(actual)) + assertEquals("Mapping comparison failed for key: " + entry.getKey(), + valCheck, actual); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMapTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMapTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMapTest.java new file mode 100644 index 0000000..da59145 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/RobinHoodBackwardShiftHashMapTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.pagemem; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Random; +import java.util.function.Consumer; +import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.jetbrains.annotations.NotNull; +import org.junit.Test; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests of {@link RobinHoodBackwardShiftHashMap} implementation. + */ +public class RobinHoodBackwardShiftHashMapTest { + + /** + * @param tester map test code + * @param cap required map capacity. + */ + private void withMap(Consumer<RobinHoodBackwardShiftHashMap> tester, int cap) { + long memSize = RobinHoodBackwardShiftHashMap.requiredMemoryByBuckets(cap); + long addr = GridUnsafe.allocateMemory(memSize); + + RobinHoodBackwardShiftHashMap map = new RobinHoodBackwardShiftHashMap(addr, memSize); + boolean success = false; + try { + tester.accept(map); + + success = true; + } + finally { + if (!success) + System.err.println(map.dump()); + + GridUnsafe.freeMemory(addr); + } + } + + /** + * @throws Exception If failed. + */ + @Test + public void testSimplestPutGet() throws Exception { + int cnt = 100; + withMap(map -> { + for (int i = 0; i < cnt; i++) { + int grpId = i + 1; + int val = grpId * grpId; + + assertSizeChanged("Unique put should be successful " + grpId, + map, () -> map.put(grpId, 1, val, 1)); + assertEquals(val, map.get(grpId, 1, 0, -1, -2)); + + assertSizeNotChanged("Duplicate put for " + grpId, + map, () -> map.put(grpId, 1, 1, 1)); + assertEquals(1, map.get(grpId, 1, 0, -1, -2)); + } + + assertEquals(cnt, map.size()); + } + , cnt); + } + + /** + * @throws Exception If failed. + */ + @Test(expected = IgniteOutOfMemoryException.class) + public void testSimplestOverflow() throws Exception { + withMap(map -> { + for (int i = 0; i < 10; i++) { + int grpId = i + 1; + int val = grpId * grpId; + assertSizeChanged("Unique put should be successful [" + grpId + "]", map, () -> map.put(grpId, 1, val, 1)); + + assertEquals(val, map.get(grpId, 1, 0, -1, -2)); + + assertSizeNotChanged("Duplicate put for " + grpId, map, () -> map.put(grpId, 1, 1, 1)); + assertEquals(1, map.get(grpId, 1, 0, -1, -2)); + } + + map.put(11, 1, 11, 1); + } + , 10); + } + + /** + * @param msg message to dump in case assertion failed. + * @param map page map to check. + * @param act action during which size is expected to be changed. + */ + private static void assertSizeChanged(String msg, LoadedPagesMap map, Runnable act) { + int size = map.size(); + act.run(); + int newSize = map.size(); + + assertNotEquals(msg, size, newSize); + } + + /** + * @param msg message to dump in case assertion failed. + * @param map page map to check. + * @param act action during which size is expected to constant. + */ + private static void assertSizeNotChanged(String msg, LoadedPagesMap map, Runnable act) { + int size = map.size(); + act.run(); + int newSize = map.size(); + + assertEquals(msg, size, newSize); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPutRemoveOnSamePlaces() throws Exception { + withMap(map -> { + doAddRemove(map); + + //fill with 1 space left; + for (int i = 0; i < 99; i++) { + int grpId = i + 1; + int val = grpId * grpId; + assertSizeChanged("Unique put should be successful " + grpId, map, + () -> map.put(grpId, 1, val, 1)); + } + + doAddRemove(map); + } + , 100); + } + + /** + * @param map tested map implementation + */ + private void doAddRemove(LoadedPagesMap map) { + for (int i = 0; i < 100; i++) { + int grpId = i + 1; + int val = grpId * grpId; + map.put(grpId, 1, val, 1); + assertEquals(val, map.get(grpId, 1, 0, -1, -2)); + + assertTrue(map.remove(grpId, 1)); + assertEquals(-1, map.get(grpId, 1, 0, -1, -2)); + } + } + + /** + * + */ + @Test + public void testCollisionOnRemove() { + Map<FullPageId, Long> ctrl = new LinkedHashMap<>(); + int cap = 10; + FullPageId baseId = new FullPageId(0, 1); + + withMap(map -> { + for (int i = 0; i < cap; i++) { + int grpId = i + 1; + int pageId = findPageIdForCollision(grpId, baseId, cap); + ctrl.put(new FullPageId(pageId, grpId), (long)grpId); + map.put(grpId, pageId, (long)grpId, 1); + } + for (FullPageId next : ctrl.keySet()) { + assertTrue(map.remove(next.groupId(), next.pageId())); + } + }, cap); + } + + /** + * @param grpId Group ID to use + * @param id Page to be placed to same bucket with + * @param cap map maximum cells. + * @return page ID to use in addition to provided {@code grpId} to reach collision. + */ + private int findPageIdForCollision(int grpId, FullPageId id, int cap) { + int bucket = U.safeAbs(id.hashCode()) % cap; + + for (int p = 0; p < 1_000_000; p++) { + if (U.safeAbs(FullPageId.hashCode(grpId, p)) % cap == bucket) + return p; + } + assertTrue(false); + return -1; + } + + /** + * + */ + @Test + public void testRandomOpsPutRemove() { + doPutRemoveTest(System.currentTimeMillis()); + } + + /** + * @param seed random seed, use timer to random run. + */ + private void doPutRemoveTest(long seed) { + System.setProperty(IGNITE_LONG_LONG_HASH_MAP_LOAD_FACTOR, "11"); + + int elementsCnt = 10_000; + + withMap(tbl -> { + Random rnd = new Random(seed); + + Map<FullPageId, Long> check = new HashMap<>(); + + int tag = 0; + for (int i = 0; i < 20_000_000; i++) { + int op = rnd.nextInt(5); + + int cacheId = rnd.nextInt(100) + 1; + int pageId = rnd.nextInt(100); + + FullPageId fullId = new FullPageId(pageId, cacheId); + + if (op == 0) { + long val = tbl.get(cacheId, pageId, tag, -1, -2); + if (val == -2) + tbl.refresh(cacheId, pageId, tag); + else { + Long checkVal = check.get(fullId); + + if (checkVal != null) { + assertEquals("Ret." + + getPageString(fullId) + + " tbl: " + val + " Check " + checkVal, + checkVal.longValue(), val); + } + } + } + else if ((op == 1 || op == 2) && (check.size() < elementsCnt)) { + long val = U.safeAbs(rnd.nextInt(30)); + + check.put(fullId, val); + + tbl.put(cacheId, pageId, val, tag); + } + else if ((op == 3) && check.size() >= elementsCnt * 2 / 3) { + tbl.remove(cacheId, pageId); + + check.remove(fullId); + } + else if (check.size() >= elementsCnt * 2 / 3) { + ReplaceCandidate ec = tbl.getNearestAt(rnd.nextInt(tbl.capacity())); + + if (ec != null) { + FullPageId fullPageId = ec.fullId(); + + tbl.remove(fullPageId.groupId(), fullPageId.pageId()); + + check.remove(fullPageId); + } + } + + i++; + } + + }, elementsCnt); + } + + /** + * @param fullId page ID. + * @return Printable string for page ID. + */ + @NotNull private String getPageString(FullPageId fullId) { + return "(grp=" + fullId.groupId() + "," + + "page=" + fullId.pageId() + ")"; + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPutAndCantGetOutdatedValue() throws Exception { + withMap(map -> { + //fill with 1 space left; + for (int i = 0; i < 99; i++) { + int ver = i; + int grpId = ver + 1; + int val = grpId * grpId; + map.put(grpId, 1, val, ver); + + assertEquals(val, map.get(grpId, 1, ver, -1, -2)); + + assertEquals(-2, map.get(grpId, 1, ver + 1, -1, -2)); + } + }, 100); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPutAndRefreshValue() throws Exception { + withMap(map -> { + //fill with 1 space left; + for (int i = 0; i < 99; i++) { + int ver = i; + int grpId = ver + 1; + int val = grpId * grpId; + int pageId = 1; + map.put(grpId, pageId, val, ver); + + map.refresh(grpId, pageId, ver + 1); + + assertEquals(val, map.get(grpId, pageId, ver + 1, -1, -2)); + + } + + doAddRemove(map); + }, 100); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testClearAtWithControlMap3() throws Exception { + int cap = 100; + + doRemovalTests(cap, (grpId, pageId) -> { + int hc = Integer.hashCode(grpId) + 31 * Long.hashCode(pageId); + + return hc % 3 == 0; + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testClearAtWithControlMap7() throws Exception { + int cap = 100; + + doRemovalTests(cap, (grpId, pageId) -> { + int hc = Integer.hashCode(grpId) + 31 * Long.hashCode(pageId); + + return hc % 7 == 0; + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testClearAllWithControlMap() throws Exception { + int cap = 100; + + doRemovalTests(cap, (grpId, pageId) -> true); + } + + /** + * @param cap capacity of map + * @param pred predicate to filter entries to be removed. + */ + private void doRemovalTests(int cap, LoadedPagesMap.KeyPredicate pred) { + withMap(map -> { + Map<FullPageId, Long> check = new HashMap<>(); + + int elems = cap - 1; //fill with 1 space left; + for (int i = 0; i < elems; i++) { + int grpId = i + 1; + long val = grpId * grpId; + int pageId = 1; + + map.put(grpId, pageId, val, i); + + check.put(new FullPageId(pageId, grpId), val); + } + + int sz = map.size(); + + GridLongList list = map.removeIf(pred); + for (int i = 0; i < list.size(); i++) { + long val = list.get(i); + + assertTrue(val > 0); + } + + assertThat(sz - map.size(), Matchers.is(list.size())); + + Map<FullPageId, Long> res = new HashMap<>(); + + map.forEach(res::put); + + assertThat(map.size(), Matchers.is(map.size())); + + for (int i = 0; i < cap; i++) { + GridLongList list1 = map.removeIf(i, i + 1, pred); + + assertTrue(list1.isEmpty()); // clear should not be successful. + } + + check.keySet().removeIf(entry -> pred.test(entry.groupId(), entry.pageId())); + + MatcherAssert.assertThat(res, Matchers.is(check)); + + }, cap); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f8ec79d/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java index 9906429..c022eea 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsUnitTestSuite.java @@ -16,8 +16,10 @@ */ package org.apache.ignite.testsuites; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.FullPageIdTableTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgnitePageMemReplaceDelayedWriteUnitTest; import org.apache.ignite.internal.processors.cache.persistence.pagemem.IgniteThrottlingUnitTest; +import org.apache.ignite.internal.processors.cache.persistence.pagemem.RobinHoodBackwardShiftHashMapTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -27,7 +29,10 @@ import org.junit.runners.Suite; @RunWith(Suite.class) @Suite.SuiteClasses({ IgniteThrottlingUnitTest.class, - IgnitePageMemReplaceDelayedWriteUnitTest.class + IgnitePageMemReplaceDelayedWriteUnitTest.class, + IgniteThrottlingUnitTest.class, + FullPageIdTableTest.class, + RobinHoodBackwardShiftHashMapTest.class }) public class IgnitePdsUnitTestSuite { }