ignite-db - fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/afc76c8d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/afc76c8d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/afc76c8d Branch: refs/heads/ignite-db-x-10884 Commit: afc76c8daae645b9191358f9d5232b90071bce6b Parents: edb1692 Author: S.Vladykin <[email protected]> Authored: Fri Apr 29 22:36:46 2016 +0300 Committer: S.Vladykin <[email protected]> Committed: Fri Apr 29 22:36:46 2016 +0300 ---------------------------------------------------------------------- .../cache/database/tree/io/DataPageIO.java | 222 +++++++++++-------- .../IgniteDbSingleNodePutGetSelfTest.java | 75 ++++++- 2 files changed, 208 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/afc76c8d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index 50a1c71..54df845 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.cache.database.tree.io; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.SB; /** * Data pages IO. @@ -231,6 +234,67 @@ public class DataPageIO extends PageIO { /** * @param buf Buffer. + * @return String representation. + */ + public String printPageLayout(ByteBuffer buf) { + int directCnt = getDirectCount(buf); + int indirectCnt = getIndirectCount(buf); + + boolean valid = directCnt >= indirectCnt; + + SB b = new SB(); + + b.appendHex(PageIO.getPageId(buf)).a(" ["); + + for (int i = 0; i < directCnt; i++) { + if (i != 0) + b.a(", "); + + short item = getItem(buf, i); + + if (item < ITEMS_OFF || item >= buf.capacity()) + valid = false; + + b.a(item); + } + + b.a("]["); + + Set<Integer> set = new HashSet<>(); + + for (int i = directCnt; i < directCnt + indirectCnt; i++) { + if (i != directCnt) + b.a(", "); + + short item = getItem(buf, i); + + int itemId = itemId(item); + int directIdx = directItemIndex(item); + + if (!set.add(directIdx) || !set.add(itemId)) + valid = false; + + assert indirectItem(itemId, directIdx) == item; + + if (itemId < directCnt || directIdx < 0 || directIdx >= directCnt) + valid = false; + + if (i > directCnt && itemId(getItem(buf, i - 1)) >= itemId) + valid = false; + + + b.a(itemId).a('^').a(directIdx); + } + + b.a("]"); + + assert valid : b.toString(); + + return b.toString(); + } + + /** + * @param buf Buffer. * @param itemId Fixed item ID (the index used for referencing an entry from the outside). * @return Data entry offset in bytes. */ @@ -248,11 +312,12 @@ public class DataPageIO extends PageIO { int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); - assert indirectItemIdx >= directCnt && indirectItemIdx < directCnt + indirectCnt: indirectCnt; + assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt; + assert indirectItemIdx < directCnt + indirectCnt: indirectItemIdx + " " + directCnt + " " + indirectCnt; itemId = directItemIndex(getItem(buf, indirectItemIdx)); - assert itemId >= 0 && itemId < directCnt: itemId; // Direct item must be here. + assert itemId >= 0 && itemId < directCnt: itemId + " " + directCnt + " " + indirectCnt; // Direct item. } return toOffset(getItem(buf, itemId)); @@ -336,40 +401,53 @@ public class DataPageIO extends PageIO { * Move the last direct item to the free slot and reference it with indirect item on the same place. * * @param buf Buffer. - * @param itemId Free slot. + * @param freeDirectIdx Free slot. * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + * @return {@code true} If the last direct item already had corresponding indirect item. */ - private static void moveLastItem(ByteBuffer buf, int itemId, int directCnt) { + private static boolean moveLastItem(ByteBuffer buf, int freeDirectIdx, int directCnt, int indirectCnt) { + int lastIndirectId = findIndirectIndexForLastDirect(buf, directCnt, indirectCnt); + int lastItemId = directCnt - 1; - setItem(buf, itemId, getItem(buf, lastItemId)); - setItem(buf, lastItemId, indirectItem(lastItemId, itemId)); + assert lastItemId != freeDirectIdx; + + short indirectItem = indirectItem(lastItemId, freeDirectIdx); + + assert itemId(indirectItem) == lastItemId && directItemIndex(indirectItem) == freeDirectIdx; + + setItem(buf, freeDirectIdx, getItem(buf, lastItemId)); + setItem(buf, lastItemId, indirectItem); + + assert getItem(buf, lastItemId) == indirectItem; + + if (lastIndirectId != -1) { // Fix pointer to direct item. + setItem(buf, lastIndirectId, indirectItem(itemId(getItem(buf, lastIndirectId)), freeDirectIdx)); + + return true; + } + + return false; } /** - * If we've moved the last item second time (it was already referenced by a indirect item), - * we need to fix the existing indirect item and the last one must be overwritten. - * * @param buf Buffer. * @param directCnt Direct items count. * @param indirectCnt Indirect items count. - * @return {@code true} If it was indirect item and it was fixed. + * @return Index of indirect item for the last direct item. */ - private static boolean fixIndirectItem(ByteBuffer buf, int directCnt, int indirectCnt) { - short item = getItem(buf, directCnt - 1); // Now it is a first indirect item after move. + private static int findIndirectIndexForLastDirect(ByteBuffer buf, int directCnt, int indirectCnt) { + int lastDirectId = directCnt - 1; - int itemId = itemId(item); + for (int i = directCnt, end = directCnt + indirectCnt; i < end; i++) { + short item = getItem(buf, i); - if (itemId == directCnt - 1) - return false; // It was a direct item initially, nothing to fix. - - // Find initial indirect item for moved last direct item. - int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); - - // Now it points to a new place. - setItem(buf, indirectItemIdx, item); + if (directItemIndex(item) == lastDirectId) + return i; + } - return true; + return -1; } /** @@ -390,88 +468,55 @@ public class DataPageIO extends PageIO { (indirectCnt == 1 && itemId == itemId(getItem(buf, 1))) : itemId; setEmptyPage(buf); - - return; // TODO May be have a separate list of free pages? - } - - // Get the entry size before the actual remove. - int rmvEntrySize = getEntrySize(buf, getDataOffset(buf, itemId), false); - - if (itemId < directCnt) - removeDirectItem(buf, itemId, directCnt, indirectCnt); - else - removeIndirectItem(buf, itemId, directCnt, indirectCnt); - - // Increase free space. - setFreeSpace(buf, getFreeSpace(buf) + rmvEntrySize + - ITEM_SIZE * (directCnt - getDirectCount(buf) + indirectCnt - getIndirectCount(buf))); - } - - /** - * @param buf Buffer. - * @param itemId Item ID. - * @param directCnt Direct items count. - * @param indirectCnt Indirect items count. - */ - private void removeDirectItem(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) { - if (itemId + 1 == directCnt) { - // It is the last direct item. - setDirectCount(buf, directCnt - 1); - - if (indirectCnt > 0) - moveItems(buf, directCnt, indirectCnt, -1); } else { - // Remove from the middle of direct items. - moveLastItem(buf, itemId, directCnt); + // Get the entry size before the actual remove. + int rmvEntrySize = getEntrySize(buf, getDataOffset(buf, itemId), false); - setDirectCount(buf, directCnt - 1); - setIndirectCount(buf, indirectCnt + 1); - } - } + int indirectId = 0; - /** - * @param buf Buffer. - * @param itemId Item ID. - * @param directCnt Direct items count. - * @param indirectCnt Indirect items count. - */ - private void removeIndirectItem(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) { - // Need to remove indirect item. - assert indirectCnt > 0: indirectCnt; // Must have indirect items here. + if (itemId >= directCnt) { // Need to remove indirect item. + assert indirectCnt > 0; - // Need to found indirect and direct indexes for the given item ID. - int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); + indirectId = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); - int allItemsCnt = directCnt + indirectCnt; + assert indirectId >= directCnt; - assert indirectItemIdx >= directCnt && indirectItemIdx < allItemsCnt: indirectCnt; + itemId = directItemIndex(getItem(buf, indirectId)); - itemId = directItemIndex(getItem(buf, indirectItemIdx)); + assert itemId < directCnt; + } - assert itemId < directCnt: itemId; // Direct item index. + boolean dropLast = true; - boolean indirectFixed = true; // By default is true because for the last item it represents the same invariant. + if (itemId + 1 < directCnt) // It is not the last direct item. + dropLast = moveLastItem(buf, itemId, directCnt, indirectCnt); - if (itemId + 1 != directCnt) { // Additional handling for a middle item. - moveLastItem(buf, itemId, directCnt); + if (indirectId == 0) {// For the last direct item with no indirect item. + if (dropLast) + moveItems(buf, directCnt, indirectCnt, -1); + else + indirectCnt++; + } + else { + if (dropLast) + moveItems(buf, directCnt, indirectId - directCnt, -1); - indirectFixed = fixIndirectItem(buf, directCnt, indirectCnt); - } + moveItems(buf, indirectId + 1, directCnt + indirectCnt - indirectId - 1, dropLast ? -2 : -1); - // Move everything before indirect item 1 step back. - // If the last item was a direct, then we have to keep it because it became indirect. - if (indirectFixed) - moveItems(buf, directCnt, indirectItemIdx - directCnt, -1); + if (dropLast) + indirectCnt--; + } - // Move everything after the found indirect index 2 step back (or 1 in case when the last was a direct item). - moveItems(buf, indirectItemIdx + 1, allItemsCnt - indirectItemIdx - 1, indirectFixed ? -2 : -1); + setIndirectCount(buf, indirectCnt); + setDirectCount(buf, directCnt - 1); - if (indirectFixed) // Otherwise we've added one and removed one indirect item. - setIndirectCount(buf, indirectCnt - 1); + assert getIndirectCount(buf) <= getDirectCount(buf); - // We always remove one direct item here. - setDirectCount(buf, directCnt - 1); + // Increase free space. + setFreeSpace(buf, getFreeSpace(buf) + rmvEntrySize + + ITEM_SIZE * (directCnt - getDirectCount(buf) + indirectCnt - getIndirectCount(buf))); + } } /** @@ -552,6 +597,7 @@ public class DataPageIO extends PageIO { int itemId = insertItem(buf, dataOff, directCnt, indirectCnt); assert check(itemId): itemId; + assert getIndirectCount(buf) <= getDirectCount(buf); // Update free space. If number of direct items did not change, then we were able to reuse item slot. setFreeSpace(buf, getFreeSpace(buf) - entrySizeWithItem + (getDirectCount(buf) == directCnt ? ITEM_SIZE : 0)); http://git-wip-us.apache.org/repos/asf/ignite/blob/afc76c8d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbSingleNodePutGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbSingleNodePutGetSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbSingleNodePutGetSelfTest.java index 99dc937..0a6c1c9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbSingleNodePutGetSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/IgniteDbSingleNodePutGetSelfTest.java @@ -58,7 +58,7 @@ public class IgniteDbSingleNodePutGetSelfTest extends GridCommonAbstractTest { dbCfg.setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4); - dbCfg.setPageSize(256); + dbCfg.setPageSize(1024); dbCfg.setPageCacheSize(100 * 1024 * 1024); @@ -110,6 +110,79 @@ public class IgniteDbSingleNodePutGetSelfTest extends GridCommonAbstractTest { } /** + * + */ + public void testRandomRemove() { + IgniteEx ig = grid(0); + + IgniteCache<Integer, DbValue> cache = ig.cache(null); + + final int cnt = 50_000; + + long seed = System.nanoTime(); + + X.println("Seed: " + seed); + + Random rnd = new GridRandom(seed); + + int[] keys = generateUniqueRandomKeys(cnt, rnd); + + X.println("Put start"); + + for (int i : keys) { + DbValue v0 = new DbValue(i, "test-value", i); + +// if (i % 1000 == 0) +// X.println(" --> " + i); + + cache.put(i, v0); + + assertEquals(v0, cache.get(i)); + } + + keys = generateUniqueRandomKeys(cnt, rnd); + + X.println("Rmv start"); + + for (int i : keys) { +// X.println(" --> " + i); + + assertTrue(cache.remove(i)); + } + } + + + /** + */ + public void testRandomPut() { + IgniteEx ig = grid(0); + + IgniteCache<Integer, DbValue> cache = ig.cache(null); + + final int cnt = 1_000; + + long seed = System.nanoTime(); + + X.println("Seed: " + seed); + + Random rnd = new GridRandom(seed); + + for (int i = 0; i < 500_000; i++) { + int k = rnd.nextInt(cnt); + + DbValue v0 = new DbValue(k, "test-value " + k, i); + + if (i % 1000 == 0) + X.println(" --> " + i); + + cache.put(k, v0); + + assertEquals(v0, cache.get(k)); + } + } + + + /** * @throws Exception if failed. */ public void testPutGetSimple() throws Exception {
