ignite-db - add

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2596f7ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2596f7ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2596f7ca

Branch: refs/heads/ignite-db-x-10884
Commit: 2596f7ca2518087473a154398fb1a40b4fb1e7fd
Parents: 22f5bb4
Author: S.Vladykin <[email protected]>
Authored: Mon Apr 18 03:57:41 2016 +0300
Committer: S.Vladykin <[email protected]>
Committed: Mon Apr 18 03:57:41 2016 +0300

----------------------------------------------------------------------
 .../cache/database/tree/io/DataPageIO.java      | 207 +++++++++++++++++--
 .../query/h2/database/H2RowStore.java           |   2 +-
 2 files changed, 189 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2596f7ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index ff3bfaf..37f78e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.database.tree.io;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -43,7 +44,10 @@ public class DataPageIO extends PageIO {
     private static final int INDIRECT_CNT_OFF = DIRECT_CNT_OFF + 1;
 
     /** */
-    private static final int ITEMS_OFF = INDIRECT_CNT_OFF + 1;
+    private static final int FIRST_ENTRY_OFF = INDIRECT_CNT_OFF + 1;
+
+    /** */
+    private static final int ITEMS_OFF = FIRST_ENTRY_OFF + 2;
 
     /** */
     private static final int ITEM_SIZE = 2;
@@ -75,6 +79,7 @@ public class DataPageIO extends PageIO {
         setDirectCount(buf, 0);
         setIndirectCount(buf, 0);
         setFreeSpace(buf, buf.capacity() - ITEMS_OFF);
+        setFirstEntryOffset(buf, buf.capacity());
     }
 
     /**
@@ -101,9 +106,37 @@ public class DataPageIO extends PageIO {
         return ITEM_SIZE + KV_LEN_SIZE + keySize + valSize + VER_SIZE;
     }
 
+    /**
+     * @param buf Buffer.
+     * @param dataOff Data offset.
+     * @param withItem Return entry size including item size.
+     * @return Entry size.
+     */
+    private int getEntrySize(ByteBuffer buf, int dataOff, boolean withItem) {
+        int res = buf.getShort(dataOff) & 0xFFFF;
+
+        if (!withItem)
+            res -= ITEM_SIZE;
+
+        return res;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param dataOff Entry data offset.
+     */
+    private void setFirstEntryOffset(ByteBuffer buf, int dataOff) {
+        assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff < buf.capacity() - 
KV_LEN_SIZE - VER_SIZE;
+
+        buf.putShort(FIRST_ENTRY_OFF, (short)dataOff);
+    }
 
-    private static int getEntrySize(ByteBuffer buf, int dataOff) {
-        return 0;
+    /**
+     * @param buf Buffer.
+     * @return Entry data offset.
+     */
+    private int getFirstEntryOffset(ByteBuffer buf) {
+        return buf.getShort(FIRST_ENTRY_OFF) & 0xFFFF;
     }
 
     /**
@@ -261,6 +294,16 @@ public class DataPageIO extends PageIO {
     }
 
     /**
+     * @param dataOff Data offset.
+     * @return Direct item.
+     */
+    private static short fromOffset(int dataOff) {
+        assert dataOff > ITEMS_OFF + ITEM_SIZE && dataOff < 65536: dataOff;
+
+        return (short)dataOff;
+    }
+
+    /**
      * @param indirectItem Indirect item.
      * @return Index of corresponding direct item.
      */
@@ -435,9 +478,18 @@ public class DataPageIO extends PageIO {
         if (cnt == 0)
             return;
 
-        int off = itemOffset(idx);
+        moveBytes(buf, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE);
+    }
 
-        // TODO
+    /**
+     * @param entrySize Entry size as returned by {@link #getEntrySize(int, 
int)}.
+     * @param firstOff First entry data offset.
+     * @param directCnt Direct items count.
+     * @param indirectCnt Indirect items count.
+     * @return {@code true} If there is enough space for the entry.
+     */
+    private static boolean enoughSpaceForEntry(int entrySize, int firstOff, 
int directCnt, int indirectCnt) {
+        return ITEMS_OFF + ITEM_SIZE * (directCnt + indirectCnt) <= firstOff - 
entrySize;
     }
 
     /**
@@ -458,26 +510,152 @@ public class DataPageIO extends PageIO {
         GridCacheVersion ver,
         int entrySize
     ) throws IgniteCheckedException {
-        if (entrySize >= buf.capacity() - ITEMS_OFF)
+        if (entrySize > buf.capacity() - ITEMS_OFF) // TODO span multiple data 
pages with a single large entry
             throw new IgniteException("Too big entry: " + key + " " + val);
 
+        int directCnt = getDirectCount(buf);
+        int indirectCnt = getIndirectCount(buf);
+
+        int dataOff = getFirstEntryOffset(buf);
+
+        // Compact if we do not have enough space.
+        if (!enoughSpaceForEntry(entrySize, dataOff, directCnt, indirectCnt)) {
+            dataOff = compactDataEntries(buf, directCnt);
+
+            assert enoughSpaceForEntry(entrySize, dataOff, directCnt, 
indirectCnt);
+        }
+
+        // Attempt to write data right before the first entry.
+        dataOff -= entrySize - ITEM_SIZE;
+
+        writeRowData(coctx, buf, dataOff, entrySize, key, val, ver);
+
+        setFirstEntryOffset(buf, dataOff);
 
+        int itemId = insertItem(buf, dataOff, directCnt, indirectCnt);
 
-        return 0;// TODO
+        assert check(itemId): itemId;
+
+        return (byte)itemId;
     }
 
     /**
      * @param buf Buffer.
      * @param dataOff Data offset.
+     * @param directCnt Direct items count.
+     * @param indirectCnt Indirect items count.
+     * @return Item ID (insertion index).
+     */
+    private int insertItem(ByteBuffer buf, int dataOff, int directCnt, int 
indirectCnt) {
+        if (indirectCnt > 0) {
+            // If the first indirect item is on correct place to become the 
last direct item, do the transition
+            // and insert the new item into the free slot which was referenced 
by this first indirect item.
+            short item = getItem(buf, directCnt);
+
+            if (itemId(item) == directCnt) {
+                int directItemIdx = directItemIndex(item);
+
+                setItem(buf, directCnt, getItem(buf, directItemIdx));
+                setItem(buf, directItemIdx, fromOffset(dataOff));
+
+                setDirectCount(buf, directCnt + 1);
+                setIndirectCount(buf, indirectCnt - 1);
+
+                return directItemIdx;
+            }
+        }
+
+        // Move all the indirect items forward to make a free slot and insert 
new item at the end of direct items.
+        moveItems(buf, directCnt, indirectCnt, +1);
+
+        setItem(buf, directCnt, fromOffset(dataOff));
+
+        setDirectCount(buf, directCnt + 1);
+
+        return directCnt;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param directCnt Direct items count.
+     * @return New first entry offset.
+     */
+    private int compactDataEntries(ByteBuffer buf, int directCnt) {
+        assert check(directCnt): directCnt;
+
+        int[] offs = new int[directCnt];
+
+        for (int i = 0; i < directCnt; i++) {
+            int off = toOffset(getItem(buf, i));
+
+            offs[i] = (off << 8) | i; // This way we'll be able to sort by 
offset using Arrays.sort(...).
+        }
+
+        Arrays.sort(offs);
+
+        // Move right all of the entries if possible to make the page as 
compact as possible to its tail.
+        int prevOff = buf.capacity();
+
+        for (int i = directCnt - 1; i >= 0; i--) {
+            int off = offs[i] >>> 8;
+
+            assert off < prevOff: off;
+
+            int entrySize = getEntrySize(buf, off, false);
+
+            int delta = prevOff - (off + entrySize);
+
+            if (delta != 0) { // Move right.
+                assert delta > 0: delta;
+
+                moveBytes(buf, off, entrySize, delta);
+
+                int itemId = offs[i] & 0xFF;
+
+                off += delta;
+
+                setItem(buf, itemId, fromOffset(off));
+            }
+
+            prevOff = off;
+        }
+
+        return prevOff;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @param off Offset.
+     * @param cnt Count.
+     * @param step Step.
+     */
+    private static void moveBytes(ByteBuffer buf, int off, int cnt, int step) {
+        assert step != 0: step;
+
+        if (step > 0) {
+            for (int i = off + cnt - 1; i >= 0; i--)
+                buf.put(i + step, buf.get(i));
+        }
+        else {
+            for (int i = off, end = off + cnt; i < end; i++)
+                buf.put(i + step, buf.get(i));
+        }
+    }
+
+    /**
+     * @param coctx Cache object context.
+     * @param buf Buffer.
+     * @param dataOff Data offset.
+     * @param entrySize Entry size as returned by {@link #getEntrySize(int, 
int)}.
      * @param key Key.
      * @param val Value.
      * @param ver Version.
      */
-    public void writeRowDataInPlace(
+    public void writeRowData(
         CacheObjectContext coctx,
         ByteBuffer buf,
         int dataOff,
-        int keyValLen,
+        int entrySize,
         CacheObject key,
         CacheObject val,
         GridCacheVersion ver
@@ -485,7 +663,7 @@ public class DataPageIO extends PageIO {
         try {
             buf.position(dataOff);
 
-            buf.putShort((short)keyValLen);
+            buf.putShort((short)entrySize);
 
             boolean written = key.putValue(buf, coctx);
 
@@ -504,13 +682,4 @@ public class DataPageIO extends PageIO {
             buf.position(0);
         }
     }
-
-    /**
-     * @param buf Buffer.
-     * @param dataOff Data offset.
-     * @return Key and value size.
-     */
-    private static int getKeyValueSize(ByteBuffer buf, int dataOff) {
-        return buf.getShort(dataOff) & 0xFFFF;
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2596f7ca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java
index c91ffbb..58126b7 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java
@@ -167,7 +167,7 @@ public class H2RowStore {
 
                 buf.position(dataOff);
 
-                // Skip key-value size.
+                // Skip entry size.
                 buf.getShort();
 
                 CacheObject key = coctx.processor().toCacheObject(coctx, buf);

Reply via email to