ignite-db - add
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2596f7ca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2596f7ca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2596f7ca Branch: refs/heads/ignite-db-x-10884 Commit: 2596f7ca2518087473a154398fb1a40b4fb1e7fd Parents: 22f5bb4 Author: S.Vladykin <[email protected]> Authored: Mon Apr 18 03:57:41 2016 +0300 Committer: S.Vladykin <[email protected]> Committed: Mon Apr 18 03:57:41 2016 +0300 ---------------------------------------------------------------------- .../cache/database/tree/io/DataPageIO.java | 207 +++++++++++++++++-- .../query/h2/database/H2RowStore.java | 2 +- 2 files changed, 189 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2596f7ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index ff3bfaf..37f78e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.database.tree.io; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -43,7 +44,10 @@ public class DataPageIO extends PageIO { private static final int INDIRECT_CNT_OFF = DIRECT_CNT_OFF + 1; /** */ - private static final int ITEMS_OFF = INDIRECT_CNT_OFF + 1; + private static final int FIRST_ENTRY_OFF = INDIRECT_CNT_OFF + 1; + + /** */ + private static final int ITEMS_OFF = FIRST_ENTRY_OFF + 2; /** */ private static final int ITEM_SIZE = 2; @@ -75,6 +79,7 @@ public class DataPageIO extends PageIO { setDirectCount(buf, 0); setIndirectCount(buf, 0); setFreeSpace(buf, buf.capacity() - ITEMS_OFF); + setFirstEntryOffset(buf, buf.capacity()); } /** @@ -101,9 +106,37 @@ public class DataPageIO extends PageIO { return ITEM_SIZE + KV_LEN_SIZE + keySize + valSize + VER_SIZE; } + /** + * @param buf Buffer. + * @param dataOff Data offset. + * @param withItem Return entry size including item size. + * @return Entry size. + */ + private int getEntrySize(ByteBuffer buf, int dataOff, boolean withItem) { + int res = buf.getShort(dataOff) & 0xFFFF; + + if (!withItem) + res -= ITEM_SIZE; + + return res; + } + + /** + * @param buf Buffer. + * @param dataOff Entry data offset. + */ + private void setFirstEntryOffset(ByteBuffer buf, int dataOff) { + assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff < buf.capacity() - KV_LEN_SIZE - VER_SIZE; + + buf.putShort(FIRST_ENTRY_OFF, (short)dataOff); + } - private static int getEntrySize(ByteBuffer buf, int dataOff) { - return 0; + /** + * @param buf Buffer. + * @return Entry data offset. + */ + private int getFirstEntryOffset(ByteBuffer buf) { + return buf.getShort(FIRST_ENTRY_OFF) & 0xFFFF; } /** @@ -261,6 +294,16 @@ public class DataPageIO extends PageIO { } /** + * @param dataOff Data offset. + * @return Direct item. + */ + private static short fromOffset(int dataOff) { + assert dataOff > ITEMS_OFF + ITEM_SIZE && dataOff < 65536: dataOff; + + return (short)dataOff; + } + + /** * @param indirectItem Indirect item. * @return Index of corresponding direct item. */ @@ -435,9 +478,18 @@ public class DataPageIO extends PageIO { if (cnt == 0) return; - int off = itemOffset(idx); + moveBytes(buf, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE); + } - // TODO + /** + * @param entrySize Entry size as returned by {@link #getEntrySize(int, int)}. + * @param firstOff First entry data offset. + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + * @return {@code true} If there is enough space for the entry. + */ + private static boolean enoughSpaceForEntry(int entrySize, int firstOff, int directCnt, int indirectCnt) { + return ITEMS_OFF + ITEM_SIZE * (directCnt + indirectCnt) <= firstOff - entrySize; } /** @@ -458,26 +510,152 @@ public class DataPageIO extends PageIO { GridCacheVersion ver, int entrySize ) throws IgniteCheckedException { - if (entrySize >= buf.capacity() - ITEMS_OFF) + if (entrySize > buf.capacity() - ITEMS_OFF) // TODO span multiple data pages with a single large entry throw new IgniteException("Too big entry: " + key + " " + val); + int directCnt = getDirectCount(buf); + int indirectCnt = getIndirectCount(buf); + + int dataOff = getFirstEntryOffset(buf); + + // Compact if we do not have enough space. + if (!enoughSpaceForEntry(entrySize, dataOff, directCnt, indirectCnt)) { + dataOff = compactDataEntries(buf, directCnt); + + assert enoughSpaceForEntry(entrySize, dataOff, directCnt, indirectCnt); + } + + // Attempt to write data right before the first entry. + dataOff -= entrySize - ITEM_SIZE; + + writeRowData(coctx, buf, dataOff, entrySize, key, val, ver); + + setFirstEntryOffset(buf, dataOff); + int itemId = insertItem(buf, dataOff, directCnt, indirectCnt); - return 0;// TODO + assert check(itemId): itemId; + + return (byte)itemId; } /** * @param buf Buffer. * @param dataOff Data offset. + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + * @return Item ID (insertion index). + */ + private int insertItem(ByteBuffer buf, int dataOff, int directCnt, int indirectCnt) { + if (indirectCnt > 0) { + // If the first indirect item is on correct place to become the last direct item, do the transition + // and insert the new item into the free slot which was referenced by this first indirect item. + short item = getItem(buf, directCnt); + + if (itemId(item) == directCnt) { + int directItemIdx = directItemIndex(item); + + setItem(buf, directCnt, getItem(buf, directItemIdx)); + setItem(buf, directItemIdx, fromOffset(dataOff)); + + setDirectCount(buf, directCnt + 1); + setIndirectCount(buf, indirectCnt - 1); + + return directItemIdx; + } + } + + // Move all the indirect items forward to make a free slot and insert new item at the end of direct items. + moveItems(buf, directCnt, indirectCnt, +1); + + setItem(buf, directCnt, fromOffset(dataOff)); + + setDirectCount(buf, directCnt + 1); + + return directCnt; + } + + /** + * @param buf Buffer. + * @param directCnt Direct items count. + * @return New first entry offset. + */ + private int compactDataEntries(ByteBuffer buf, int directCnt) { + assert check(directCnt): directCnt; + + int[] offs = new int[directCnt]; + + for (int i = 0; i < directCnt; i++) { + int off = toOffset(getItem(buf, i)); + + offs[i] = (off << 8) | i; // This way we'll be able to sort by offset using Arrays.sort(...). + } + + Arrays.sort(offs); + + // Move right all of the entries if possible to make the page as compact as possible to its tail. + int prevOff = buf.capacity(); + + for (int i = directCnt - 1; i >= 0; i--) { + int off = offs[i] >>> 8; + + assert off < prevOff: off; + + int entrySize = getEntrySize(buf, off, false); + + int delta = prevOff - (off + entrySize); + + if (delta != 0) { // Move right. + assert delta > 0: delta; + + moveBytes(buf, off, entrySize, delta); + + int itemId = offs[i] & 0xFF; + + off += delta; + + setItem(buf, itemId, fromOffset(off)); + } + + prevOff = off; + } + + return prevOff; + } + + /** + * @param buf Buffer. + * @param off Offset. + * @param cnt Count. + * @param step Step. + */ + private static void moveBytes(ByteBuffer buf, int off, int cnt, int step) { + assert step != 0: step; + + if (step > 0) { + for (int i = off + cnt - 1; i >= 0; i--) + buf.put(i + step, buf.get(i)); + } + else { + for (int i = off, end = off + cnt; i < end; i++) + buf.put(i + step, buf.get(i)); + } + } + + /** + * @param coctx Cache object context. + * @param buf Buffer. + * @param dataOff Data offset. + * @param entrySize Entry size as returned by {@link #getEntrySize(int, int)}. * @param key Key. * @param val Value. * @param ver Version. */ - public void writeRowDataInPlace( + public void writeRowData( CacheObjectContext coctx, ByteBuffer buf, int dataOff, - int keyValLen, + int entrySize, CacheObject key, CacheObject val, GridCacheVersion ver @@ -485,7 +663,7 @@ public class DataPageIO extends PageIO { try { buf.position(dataOff); - buf.putShort((short)keyValLen); + buf.putShort((short)entrySize); boolean written = key.putValue(buf, coctx); @@ -504,13 +682,4 @@ public class DataPageIO extends PageIO { buf.position(0); } } - - /** - * @param buf Buffer. - * @param dataOff Data offset. - * @return Key and value size. - */ - private static int getKeyValueSize(ByteBuffer buf, int dataOff) { - return buf.getShort(dataOff) & 0xFFFF; - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2596f7ca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java index c91ffbb..58126b7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java @@ -167,7 +167,7 @@ public class H2RowStore { buf.position(dataOff); - // Skip key-value size. + // Skip entry size. buf.getShort(); CacheObject key = coctx.processor().toCacheObject(coctx, buf);
