ignite-db - remove
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22f5bb42 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22f5bb42 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22f5bb42 Branch: refs/heads/ignite-db-x-10884 Commit: 22f5bb4203ec365ad990060e78eff19b73b4e7a1 Parents: 3a7e377 Author: S.Vladykin <[email protected]> Authored: Sun Apr 17 21:57:33 2016 +0300 Committer: S.Vladykin <[email protected]> Committed: Sun Apr 17 21:57:33 2016 +0300 ---------------------------------------------------------------------- .../cache/database/tree/io/DataPageIO.java | 453 +++++++++++++++---- .../query/h2/database/H2RowStore.java | 42 +- .../query/h2/database/freelist/FreeList.java | 2 +- 3 files changed, 388 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/22f5bb42/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index 8c535eb..ff3bfaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -34,20 +34,26 @@ public class DataPageIO extends PageIO { ); /** */ - private static final int OCCUPIED_SIZE_OFF = COMMON_HEADER_END; + private static final int FREE_SPACE_OFF = COMMON_HEADER_END; /** */ - private static final int ALL_CNT_OFF = OCCUPIED_SIZE_OFF + 2; + private static final int DIRECT_CNT_OFF = FREE_SPACE_OFF + 2; /** */ - private static final int LIVE_CNT_OFF = ALL_CNT_OFF + 2; + private static final int INDIRECT_CNT_OFF = DIRECT_CNT_OFF + 1; /** */ - private static final int ITEMS_OFF = LIVE_CNT_OFF + 2; + private static final int ITEMS_OFF = INDIRECT_CNT_OFF + 1; /** */ private static final int ITEM_SIZE = 2; + /** */ + private static final int KV_LEN_SIZE = 2; // TODO entry will span multiple pages?? + + /** */ + private static final int VER_SIZE = 24; + /** * @param ver Page format version. */ @@ -59,163 +65,405 @@ public class DataPageIO extends PageIO { @Override public void initNewPage(ByteBuffer buf, long pageId) { super.initNewPage(buf, pageId); - setAllCount(buf, 0); - setLiveCount(buf, 0); - setOccupiedSize(buf, 0); + setEmptyPage(buf); } - public int getOccupiedSize(ByteBuffer buf) { - return buf.getShort(OCCUPIED_SIZE_OFF) & 0xFFFF; + /** + * @param buf Buffer. + */ + private void setEmptyPage(ByteBuffer buf) { + setDirectCount(buf, 0); + setIndirectCount(buf, 0); + setFreeSpace(buf, buf.capacity() - ITEMS_OFF); } - public void setOccupiedSize(ByteBuffer buf, int size) { - buf.putShort(OCCUPIED_SIZE_OFF, (short)size); + /** + * @param coctx Cache object context. + * @param key Key. + * @param val Value. + * @return Entry size on page. + * @throws IgniteCheckedException If failed. + */ + public static int getEntrySize(CacheObjectContext coctx, CacheObject key, CacheObject val) + throws IgniteCheckedException { + int keyLen = key.valueBytesLength(coctx); + int valLen = val.valueBytesLength(coctx); + + return getEntrySize(keyLen, valLen); + } - assert getOccupiedSize(buf) == size; + /** + * @param keySize Key size. + * @param valSize Value size. + * @return Entry size including item. + */ + private static int getEntrySize(int keySize, int valSize) { + return ITEM_SIZE + KV_LEN_SIZE + keySize + valSize + VER_SIZE; } - public int getAllCount(ByteBuffer buf) { - return buf.getShort(ALL_CNT_OFF) & 0xFFFF; + + private static int getEntrySize(ByteBuffer buf, int dataOff) { + return 0; } - public void setAllCount(ByteBuffer buf, int cnt) { - buf.putShort(ALL_CNT_OFF, (short)cnt); + /** + * @param buf Buffer. + * @param freeSpace Free space. + */ + private void setFreeSpace(ByteBuffer buf, int freeSpace) { + assert freeSpace >= 0 && freeSpace <= Short.MAX_VALUE; - assert cnt == getAllCount(buf); + buf.putShort(FREE_SPACE_OFF, (short)freeSpace); } - public int getLiveCount(ByteBuffer buf) { - return buf.getShort(LIVE_CNT_OFF) & 0xFFFF; + /** + * @param buf Buffer. + * @return Free space. + */ + public int getFreeSpace(ByteBuffer buf) { + return buf.getShort(FREE_SPACE_OFF); } - public void setLiveCount(ByteBuffer buf, int cnt) { - buf.putShort(LIVE_CNT_OFF, (short)cnt); + /** + * @param buf Buffer. + * @param cnt Direct count. + */ + private void setDirectCount(ByteBuffer buf, int cnt) { + assert check(cnt): cnt; + + buf.put(DIRECT_CNT_OFF, (byte)cnt); + } - assert cnt == getLiveCount(buf); + /** + * @param buf Buffer. + * @return Direct count. + */ + private int getDirectCount(ByteBuffer buf) { + return buf.get(DIRECT_CNT_OFF) & 0xFF; } - public boolean canAddEntry(ByteBuffer buf, int entrySize) { - int free = buf.capacity() - ITEMS_OFF - getOccupiedSize(buf); + /** + * @param buf Buffer. + * @param cnt Indirect count. + */ + private void setIndirectCount(ByteBuffer buf, int cnt) { + assert check(cnt): cnt; - if (free < entrySize) - return false; + buf.put(INDIRECT_CNT_OFF, (byte)cnt); + } - free -= (getAllCount(buf) - getLiveCount(buf)) * ITEM_SIZE; + /** + * @param idx Index. + * @return {@code true} If the index is valid. + */ + public static boolean check(int idx) { + return idx >= 0 && idx < 256; + } - return free >= entrySize; + /** + * @param buf Buffer. + * @return Indirect count. + */ + private int getIndirectCount(ByteBuffer buf) { + return buf.get(INDIRECT_CNT_OFF) & 0xFF; } /** - * @param keySize Key size. - * @param valSize Value size. - * @return Entry size including item. + * @param buf Buffer. + * @param itemId Fixed item ID (the index used for referencing an entry from the outside). + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + * @return Found index of indirect item. */ - private static int entrySize(int keySize, int valSize) { - return ITEM_SIZE + 2/*key+val len*/ + keySize + valSize + 24/*ver*/; + private static int findIndirectItemIndex(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) { + int low = directCnt; + int high = directCnt + indirectCnt - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + + int cmp = Integer.compare(itemId(getItem(buf, mid)), itemId); + + if (cmp < 0) + low = mid + 1; + else if (cmp > 0) + high = mid - 1; + else + return mid; // found + } + + throw new IllegalStateException("Item not found: " + itemId); } /** - * @param idx Index of item. - * @return Offset in bytes. + * @param buf Buffer. + * @param itemId Fixed item ID (the index used for referencing an entry from the outside). + * @return Data entry offset in bytes. */ - private static int offset(int idx) { - return ITEMS_OFF + idx * ITEM_SIZE; + public int getDataOffset(ByteBuffer buf, int itemId) { + assert check(itemId): itemId; + + int directCnt = getDirectCount(buf); + + assert directCnt > 0: directCnt; + + if (itemId >= directCnt) { // Need to do indirect lookup. + int indirectCnt = getIndirectCount(buf); + + assert indirectCnt > 0: indirectCnt; // Must have indirect items here. + + int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); + + assert indirectItemIdx >= directCnt && indirectItemIdx < directCnt + indirectCnt: indirectCnt; + + itemId = directItemIndex(getItem(buf, indirectItemIdx)); + } + + assert itemId >= 0 && itemId < directCnt: itemId; // Direct item must be here. + + return toOffset(getItem(buf, itemId)); } /** * @param buf Buffer. - * @param idx Index of item. - * @return Data offset in bytes. + * @param idx Item index. + * @return Item. */ - public int getDataOffset(ByteBuffer buf, int idx) { - return buf.getShort(offset(idx)) & 0xFFFF; + private static short getItem(ByteBuffer buf, int idx) { + return buf.getShort(itemOffset(idx)); } /** * @param buf Buffer. - * @param idx Index of item. - * @param dataOff Data offset in bytes. + * @param idx Item index. + * @param item Item. + */ + private static void setItem(ByteBuffer buf, int idx, short item) { + buf.putShort(itemOffset(idx), item); + } + + /** + * @param idx Index of the item. + * @return Offset in buffer. */ - private void setDataOffset(ByteBuffer buf, int idx, int dataOff) { - buf.putShort(offset(idx), (short)dataOff); + private static int itemOffset(int idx) { + assert check(idx): idx; - assert dataOff == getDataOffset(buf, idx); + return ITEMS_OFF + idx * ITEM_SIZE; + } + + /** + * @param directItem Direct item. + * @return Offset of an entry payload inside of the page. + */ + private static int toOffset(short directItem) { + return directItem & 0xFFFF; } /** - * Make a window for data entry. + * @param indirectItem Indirect item. + * @return Index of corresponding direct item. + */ + private static int directItemIndex(short indirectItem) { + return indirectItem & 0xFF; + } + + /** + * @param indirectItem Indirect item. + * @return Fixed item ID (the index used for referencing an entry from the outside). + */ + private static int itemId(short indirectItem) { + return indirectItem >>> 8; + } + + /** + * @param itemId Fixed item ID (the index used for referencing an entry from the outside). + * @param directItemIdx Index of corresponding direct item. + * @return Indirect item. + */ + private static short indirectItem(int itemId, int directItemIdx) { + assert check(itemId): itemId; + assert check(directItemIdx): directItemIdx; + + return (short)((itemId << 8) | directItemIdx); + } + + /** + * Move the last direct item to the free slot and reference it with indirect item on the same place. + * + * @param buf Buffer. + * @param itemId Free slot. + * @param directCnt Direct items count. + */ + private static void moveLastItem(ByteBuffer buf, int itemId, int directCnt) { + int lastItemId = directCnt - 1; + + setItem(buf, itemId, getItem(buf, lastItemId)); + setItem(buf, lastItemId, indirectItem(lastItemId, itemId)); + } + + /** + * If we've moved the last item second time (it was already referenced by a indirect item), + * we need to fix the existing indirect item and the last one must be overwritten. * * @param buf Buffer. - * @param idx Index of the new item. - * @param allCnt All count. - * @param entrySize Entry size. - * @return Data offset for the new entry. + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + * @return {@code true} If it was indirect item and it was fixed. + */ + private static boolean fixIndirectItem(ByteBuffer buf, int directCnt, int indirectCnt) { + short item = getItem(buf, directCnt - 1); // Now it is a first indirect item after move. + + int itemId = itemId(item); + + if (itemId == directCnt - 1) + return false; // It was a direct item initially, nothing to fix. + + // Find initial indirect item for moved last direct item. + int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); + + // Now it points to a new place. + setItem(buf, indirectItemIdx, item); + + return true; + } + + /** + * @param buf Buffer. + * @param itemId Fixed item ID (the index used for referencing an entry from the outside). */ - public int makeWindow(ByteBuffer buf, int idx, int allCnt, int entrySize) { - if (idx == allCnt) { // Adding to the end of items. - int off = offset(idx); - int lastDataOff = allCnt == 0 ? buf.capacity() : getDataOffset(buf, allCnt - 1); + public void removeRow(ByteBuffer buf, int itemId) { + assert check(itemId) : itemId; + + int directCnt = getDirectCount(buf); + int indirectCnt = getIndirectCount(buf); + + assert directCnt > 0 : directCnt; // Direct count always represents overall number of live items. + + // Remove the last item on the page. + if (directCnt == 1) { + assert (indirectCnt == 0 && itemId == 0) || + (indirectCnt == 1 && itemId == itemId(getItem(buf, 1))) : itemId; + + setEmptyPage(buf); + + return; // TODO May be have a separate list of free pages? + } + + if (itemId < directCnt) + removeDirectItem(buf, itemId, directCnt, indirectCnt); + else + removeIndirectItem(buf, itemId, directCnt, indirectCnt); + } - if (lastDataOff - off < entrySize) // TODO try to defragment - return -1; + /** + * @param buf Buffer. + * @param itemId Item ID. + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + */ + private void removeDirectItem(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) { + if (itemId + 1 == directCnt) { + // It is the last direct item. + setDirectCount(buf, directCnt - 1); - return lastDataOff - entrySize + ITEM_SIZE; + if (indirectCnt > 0) + moveItems(buf, directCnt, indirectCnt, -1); } else { - //TODO defragment page with respect to idx and entrySize (if idx is not last, the window must be not first) - throw new UnsupportedOperationException(); + // Remove from the middle of direct items. + moveLastItem(buf, itemId, directCnt); + + setDirectCount(buf, directCnt - 1); + setIndirectCount(buf, indirectCnt + 1); } } - public int addRow( - CacheObjectContext coctx, - ByteBuffer buf, - CacheObject key, - CacheObject val, - GridCacheVersion ver) throws IgniteCheckedException - { - int keyLen = key.valueBytesLength(coctx); - int valLen = val.valueBytesLength(coctx); - int entrySize = entrySize(keyLen, valLen); + /** + * @param buf Buffer. + * @param itemId Item ID. + * @param directCnt Direct items count. + * @param indirectCnt Indirect items count. + */ + private void removeIndirectItem(ByteBuffer buf, int itemId, int directCnt, int indirectCnt) { + // Need to remove indirect item. + assert indirectCnt > 0: indirectCnt; // Must have indirect items here. - if (entrySize >= buf.capacity() - ITEMS_OFF) - throw new IgniteException("Too big entry: " + keyLen + " " + valLen); + // Need to found indirect and direct indexes for the given item ID. + int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); - if (!canAddEntry(buf, entrySize)) - return -1; + int allItemsCnt = directCnt + indirectCnt; - int liveCnt = getLiveCount(buf); - int allCnt = getAllCount(buf); - int idx = 0; + assert indirectItemIdx >= directCnt && indirectItemIdx < allItemsCnt: indirectCnt; - if (allCnt == liveCnt) - idx = allCnt; // Allocate new idx at allCnt if all are alive. - else { - // Lookup for a free parking lot. - while (idx < allCnt) { - if (getDataOffset(buf, idx) == 0) - break; + itemId = directItemIndex(getItem(buf, indirectItemIdx)); + + assert itemId < directCnt: itemId; // Direct item index. + + boolean indirectFixed = true; // By default is true because for the last item it represents the same invariant. - idx++; - } + if (itemId + 1 != directCnt) { // Additional handling for a middle item. + moveLastItem(buf, itemId, directCnt); + + indirectFixed = fixIndirectItem(buf, directCnt, indirectCnt); } - int dataOff = makeWindow(buf, idx, allCnt, entrySize); + // Move everything before indirect item 1 step back. + // If the last item was a direct, then we have to keep it because it became indirect. + if (indirectFixed) + moveItems(buf, directCnt, indirectItemIdx - directCnt, -1); + + // Move everything after the found indirect index 2 step back (or 1 in case when the last was a direct item). + moveItems(buf, indirectItemIdx + 1, allItemsCnt - indirectItemIdx - 1, indirectFixed ? -2 : -1); + + if (indirectFixed) // Otherwise we've added one and removed one indirect item. + setIndirectCount(buf, indirectCnt - 1); + + // We always remove one direct item here. + setDirectCount(buf, directCnt - 1); + } - if (dataOff == -1) - return -1; + /** + * @param buf Buffer. + * @param idx Index. + * @param cnt Count. + * @param step Step. + */ + private static void moveItems(ByteBuffer buf, int idx, int cnt, int step) { + assert cnt >= 0: cnt; - // Write data. - writeRowDataInPlace(coctx, buf, dataOff, keyLen + valLen, key, val, ver); - // Write item. - setDataOffset(buf, idx, dataOff); + if (cnt == 0) + return; - // Update header. - setOccupiedSize(buf, getOccupiedSize(buf) + entrySize); - setAllCount(buf, allCnt + 1); - setLiveCount(buf, liveCnt + 1); + int off = itemOffset(idx); - return idx; + // TODO + } + + /** + * @param coctx Cache object context. + * @param buf Buffer. + * @param key Key. + * @param val Value. + * @param ver Version. + * @param entrySize Entry size as returned by {@link #getEntrySize(int, int)}. + * @return Item ID. + * @throws IgniteCheckedException If failed. + */ + public byte addRow( + CacheObjectContext coctx, + ByteBuffer buf, + CacheObject key, + CacheObject val, + GridCacheVersion ver, + int entrySize + ) throws IgniteCheckedException { + if (entrySize >= buf.capacity() - ITEMS_OFF) + throw new IgniteException("Too big entry: " + key + " " + val); + + + + return 0;// TODO } /** @@ -257,7 +505,12 @@ public class DataPageIO extends PageIO { } } - public int getKeyValueSize(ByteBuffer buf, int idx) { - return buf.getShort(getDataOffset(buf, idx)) & 0xFFFF; + /** + * @param buf Buffer. + * @param dataOff Data offset. + * @return Key and value size. + */ + private static int getKeyValueSize(ByteBuffer buf, int dataOff) { + return buf.getShort(dataOff) & 0xFFFF; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/22f5bb42/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java index 835e359..c91ffbb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java @@ -65,9 +65,9 @@ public class H2RowStore { /** */ private final PageHandler<GridH2Row> writeRow = new PageHandler<GridH2Row>() { @Override public int run(Page page, ByteBuffer buf, GridH2Row row, int ignore) throws IgniteCheckedException { - DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + int entrySize = DataPageIO.getEntrySize(coctx, row.key, row.val); - int entrySize = DataPageIO.entrySize(coctx, row.key, row.val); + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); int idx = io.addRow(coctx, buf, row.key, row.val, row.ver, entrySize); @@ -81,6 +81,19 @@ public class H2RowStore { } }; + /** */ + private final PageHandler<Void> removeRow = new PageHandler<Void>() { + @Override public int run(Page page, ByteBuffer buf, Void ignore, int itemId) throws IgniteCheckedException { + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + assert DataPageIO.check(itemId): itemId; + + io.removeRow(buf, (byte)itemId); + + return 0; + } + }; + /** * @param rowDesc Row descriptor. * @param cctx Cache context. @@ -118,12 +131,25 @@ public class H2RowStore { } /** + * @param link Row link. + * @throws IgniteCheckedException If failed. + */ + public void removeRow(long link) throws IgniteCheckedException { + assert link != 0; + + try (Page page = page(pageId(link))) { + writePage(page, removeRow, null, dwordsOffset(link), 0); + } + } + + /** * !!! This method must be invoked in read or write lock of referring index page. It is needed to * !!! make sure that row at this link will be invisible, when the link will be removed from * !!! from all the index pages, so that row can be safely erased from the data page. * * @param link Link. * @return Row. + * @throws IgniteCheckedException If failed. */ public GridH2Row getRow(long link) throws IgniteCheckedException { try (Page page = page(pageId(link))) { @@ -154,22 +180,22 @@ public class H2RowStore { GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); - GridH2Row res; + GridH2Row row; try { - res = rowDesc.createRow(key, PageIdUtils.partId(link), val, ver, 0); + row = rowDesc.createRow(key, PageIdUtils.partId(link), val, ver, 0); - res.link = link; + row.link = link; } catch (IgniteCheckedException e) { throw new IgniteException(e); } - assert res.ver != null; + assert row.ver != null; - rowDesc.cache(res); + rowDesc.cache(row); - return res; + return row; } finally { page.releaseRead(); http://git-wip-us.apache.org/repos/asf/ignite/blob/22f5bb42/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java index 25c94da..dfde231 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/freelist/FreeList.java @@ -133,7 +133,7 @@ public class FreeList { public void writeRowData(GridH2Row row) throws IgniteCheckedException { assert row.link == 0; - int entrySize = DataPageIO.entrySize(cctx.cacheObjectContext(), row.key, row.val); + int entrySize = DataPageIO.getEntrySize(cctx.cacheObjectContext(), row.key, row.val); assert entrySize > 0 && entrySize < Short.MAX_VALUE: entrySize;
