http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/CompareUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/CompareUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/CompareUtils.java deleted file mode 100644 index a30c181..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/CompareUtils.java +++ /dev/null @@ -1,332 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.database; - -import java.math.BigDecimal; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.GridUnsafe; -import org.h2.util.MathUtils; -import org.h2.value.Value; - -/** - * - */ -public class CompareUtils { - /** */ - private static final int UTF_8_MIN_2_BYTES = 0x80; - - /** */ - private static final int UTF_8_MIN_3_BYTES = 0x800; - - /** */ - private static final int UTF_8_MIN_4_BYTES = 0x10000; - - /** */ - private static final int UTF_8_MAX_CODE_POINT = 0x10ffff; - - /** */ - private static final BigDecimal MAX_LONG_DECIMAL = BigDecimal.valueOf(Long.MAX_VALUE); - - /** */ - private static final BigDecimal MIN_LONG_DECIMAL = BigDecimal.valueOf(Long.MIN_VALUE); - - /** - * @param x Value. - * @return Byte value. - */ - public static byte convertToByte(long x) { - if (x > Byte.MAX_VALUE || x < Byte.MIN_VALUE) - throw new IgniteException("Numeric value out of range: " + x); - - return (byte) x; - } - - /** - * @param x Value. - * @return Short value. - */ - public static short convertToShort(long x) { - if (x > Short.MAX_VALUE || x < Short.MIN_VALUE) - throw new IgniteException("Numeric value out of range: " + x); - - return (short) x; - } - - /** - * @param x Value. - * @return Int value. - */ - public static int convertToInt(long x) { - if (x > Integer.MAX_VALUE || x < Integer.MIN_VALUE) - throw new IgniteException("Numeric value out of range: " + x); - - return (int) x; - } - - /** - * @param x Value. - * @return Long value. - */ - public static long convertToLong(double x) { - if (x > Long.MAX_VALUE || x < Long.MIN_VALUE) - throw new IgniteException("Numeric value out of range: " + x); - - return Math.round(x); - } - - /** - * @param x Value. - * @return Long value. - */ - public static long convertToLong(BigDecimal x) { - if (x.compareTo(MAX_LONG_DECIMAL) > 0 || x.compareTo(MIN_LONG_DECIMAL) < 0) - throw new IgniteException("Numeric value out of range: " + x); - - return x.setScale(0, BigDecimal.ROUND_HALF_UP).longValue(); - } - - /** - * @param v Value1. - * @param val Value2. - * @return Compare result. - */ - public static int compareBoolean(boolean v, Value val) { - boolean v2 = val.getBoolean(); - - return (v == v2) ? 0 : (v ? 1 : -1); - } - - /** - * @param v Value1. - * @param val Value2. - * @return Compare result. - */ - public static int compareByte(byte v, Value val) { - byte v2 = val.getByte(); - - return MathUtils.compareInt(v, v2); - } - - /** - * @param val1Addr First string UTF-8 bytes address. - * @param val1Len Number of bytes in first string. - * @param val2Bytes Second string bytes. - * @param val2Off Second string offset. - * @param val2Len Number of bytes in second string. - * @return Compare result. - * @throws IgniteCheckedException In case of error. - */ - public static int compareUtf8(long val1Addr, - int val1Len, - byte[] val2Bytes, - int val2Off, - int val2Len) throws IgniteCheckedException { - int len = Math.min(val1Len, val2Len); - - for (int i = 0; i < len; i++) { - int b1 = GridUnsafe.getByte(val1Addr + i) & 0xFF; - int b2 = val2Bytes[val2Off + i] & 0xFF; - - if (b1 != b2) - return b1 > b2 ? 1 : -1; - } - - return Integer.compare(val1Len, val2Len); - } - - /** - * @param addr UTF-8 bytes address. - * @param len Number of bytes to decode. - * @param str String to compare with. - * @return Compare result. - * @throws IgniteCheckedException In case of error. - */ - public static int compareUtf8(long addr, int len, String str) throws IgniteCheckedException { - int pos = 0; - - int cntr = 0; - - int strLen = str.length(); - - // ASCII only optimized loop. - while (pos < len) { - byte ch = GridUnsafe.getByte(addr + pos); - - if (ch >= 0) { - char c0 = (char)ch; - - if (cntr < strLen) { - char c1 = str.charAt(cntr); - - if (c0 != c1) - return c0 - c1; - } - else - return 1; - - cntr++; - - pos++; - } - else - break; - } - - // TODO: check index bounds. - - while (pos < len) { - int ch = GridUnsafe.getByte(addr + pos++) & 0xff; - - // Convert UTF-8 to 21-bit codepoint. - if (ch < 0x80) { - // 0xxxxxxx -- length 1. - } - else if (ch < 0xc0) { - // 10xxxxxx -- illegal! - throw new IgniteException("Illegal UTF-8 sequence."); - } - else if (ch < 0xe0) { - // 110xxxxx 10xxxxxx - ch = ((ch & 0x1f) << 6); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); - - checkMinimal(ch, UTF_8_MIN_2_BYTES); - } - else if (ch < 0xf0) { - // 1110xxxx 10xxxxxx 10xxxxxx - ch = ((ch & 0x0f) << 12); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 6); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); - - checkMinimal(ch, UTF_8_MIN_3_BYTES); - } - else if (ch < 0xf8) { - // 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx - ch = ((ch & 0x07) << 18); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 12); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 6); - - checkByte(GridUnsafe.getByte(addr + pos)); - - ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); - - checkMinimal(ch, UTF_8_MIN_4_BYTES); - } - else - throw new IgniteException("Illegal UTF-8 sequence."); - - if (ch > UTF_8_MAX_CODE_POINT) - throw new IgniteException("Illegal UTF-8 sequence."); - - // Convert 21-bit codepoint to Java chars: - // 0..ffff are represented directly as a single char - // 10000..10ffff are represented as a "surrogate pair" of two chars - if (ch > 0xffff) { - // Use a surrogate pair to represent it. - ch -= 0x10000; // ch is now 0..fffff (20 bits) - - char c0 = (char)(0xd800 + (ch >> 10)); // Top 10 bits. - - if (cntr < strLen) { - char c1 = str.charAt(cntr); - - if (c0 != c1) - return c0 - c1; - } - else - return 1; - - cntr++; - - c0 = (char)(0xdc00 + (ch & 0x3ff)); // Bottom 10 bits. - - if (cntr < strLen) { - char c1 = str.charAt(cntr); - - if (c0 != c1) - return c0 - c1; - } - else - return 1; - - cntr++; - } - else if (ch >= 0xd800 && ch < 0xe000) - // Not allowed to encode the surrogate range directly. - throw new IgniteException("Illegal UTF-8 sequence."); - else { - // Normal case. - char c0 = (char)ch; - - if (cntr < strLen) { - char c1 = str.charAt(cntr); - - if (c0 != c1) - return c0 - c1; - } - else - return 1; - - cntr++; - } - } - - // Check if we ran past the end without seeing an exception. - if (pos > len) - throw new IgniteException("Illegal UTF-8 sequence."); - - return cntr - strLen; - } - - /** - * @param ch UTF-8 byte. - * @throws IgniteException In case of error. - */ - private static void checkByte(int ch) { - if ((ch & 0xc0) != 0x80) - throw new IgniteException("Illegal UTF-8 sequence."); - } - - /** - * @param ch UTF-8 byte. - * @param minVal Minimum value. - * @throws IgniteException In case of error. - */ - private static void checkMinimal(int ch, int minVal) { - if (ch >= minVal) - return; - - throw new IgniteException("Illegal UTF-8 sequence."); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataPageIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataPageIO.java deleted file mode 100644 index c0fd3e9..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataPageIO.java +++ /dev/null @@ -1,281 +0,0 @@ -package org.apache.ignite.internal.processors.query.h2.database; - -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; - -/** - * - */ -public class DataPageIO extends PageIO { - /** */ - private static final DataPageIO V1 = new DataPageIO(); - - /** */ - private static final int OCCUPIED_SIZE_OFF = COMMON_HEADER_END; - - /** */ - private static final int ALL_CNT_OFF = OCCUPIED_SIZE_OFF + 2; - - /** */ - private static final int LIVE_CNT_OFF = ALL_CNT_OFF + 2; - - /** */ - private static final int ITEMS_OFF = LIVE_CNT_OFF + 2; - - /** */ - private static final int ITEM_SIZE = 2; - - /** - * @param buf Buffer. - * @return Instance. - */ - public static DataPageIO forPage(ByteBuffer buf) { - assert getType(buf) == T_DATA; - - return forVersion(getVersion(buf)); - } - - /** - * @return Latest data page format. - */ - public static DataPageIO latest() { - return V1; - } - - /** - * @param ver Version. - * @return Instance for version. - */ - public static DataPageIO forVersion(int ver) { - switch (ver){ - case 1: - return V1; - - default: - throw new IgniteException("Unsupported version: " + ver); - } - } - - protected DataPageIO() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int getType() { - return T_DATA; - } - - /** {@inheritDoc} */ - @Override public int getVersion() { - return 1; - } - - /** {@inheritDoc} */ - @Override public void initNewPage(ByteBuffer buf, long pageId) { - super.initNewPage(buf, pageId); - setAllCount(buf, 0); - setLiveCount(buf, 0); - setOccupiedSize(buf, 0); - } - - public int getOccupiedSize(ByteBuffer buf) { - return buf.getShort(OCCUPIED_SIZE_OFF) & 0xFFFF; - } - - public void setOccupiedSize(ByteBuffer buf, int size) { - buf.putShort(OCCUPIED_SIZE_OFF, (short)size); - - assert getOccupiedSize(buf) == size; - } - - public int getAllCount(ByteBuffer buf) { - return buf.getShort(ALL_CNT_OFF) & 0xFFFF; - } - - public void setAllCount(ByteBuffer buf, int cnt) { - buf.putShort(ALL_CNT_OFF, (short)cnt); - - assert cnt == getAllCount(buf); - } - - public int getLiveCount(ByteBuffer buf) { - return buf.getShort(LIVE_CNT_OFF) & 0xFFFF; - } - - public void setLiveCount(ByteBuffer buf, int cnt) { - buf.putShort(LIVE_CNT_OFF, (short)cnt); - - assert cnt == getLiveCount(buf); - } - - public boolean canAddEntry(ByteBuffer buf, int entrySize) { - int free = buf.capacity() - ITEMS_OFF - getOccupiedSize(buf); - - if (free < entrySize) - return false; - - free -= (getAllCount(buf) - getLiveCount(buf)) * ITEM_SIZE; - - return free >= entrySize; - } - - /** - * @param keySize Key size. - * @param valSize Value size. - * @return Entry size including item. - */ - private static int entrySize(int keySize, int valSize) { - return ITEM_SIZE + 2/*key+val len*/ + keySize + valSize + 24/*ver*/; - } - - /** - * @param idx Index of item. - * @return Offset in bytes. - */ - private static int offset(int idx) { - return ITEMS_OFF + idx * ITEM_SIZE; - } - - /** - * @param buf Buffer. - * @param idx Index of item. - * @return Data offset in bytes. - */ - public int getDataOffset(ByteBuffer buf, int idx) { - return buf.getShort(offset(idx)) & 0xFFFF; - } - - /** - * @param buf Buffer. - * @param idx Index of item. - * @param dataOff Data offset in bytes. - */ - private void setDataOffset(ByteBuffer buf, int idx, int dataOff) { - buf.putShort(offset(idx), (short)dataOff); - - assert dataOff == getDataOffset(buf, idx); - } - - /** - * Make a window for data entry. - * - * @param buf Buffer. - * @param idx Index of the new item. - * @param allCnt All count. - * @param entrySize Entry size. - * @return Data offset for the new entry. - */ - public int makeWindow(ByteBuffer buf, int idx, int allCnt, int entrySize) { - if (idx == allCnt) { // Adding to the end of items. - int off = offset(idx); - int lastDataOff = allCnt == 0 ? buf.capacity() : getDataOffset(buf, allCnt - 1); - - if (lastDataOff - off < entrySize) // TODO try to defragment - return -1; - - return lastDataOff - entrySize + ITEM_SIZE; - } - else { - //TODO defragment page with respect to idx and entrySize (if idx is not last, the window must be not first) - throw new UnsupportedOperationException(); - } - } - - public int addRow( - CacheObjectContext coctx, - ByteBuffer buf, - CacheObject key, - CacheObject val, - GridCacheVersion ver) throws IgniteCheckedException - { - int keyLen = key.valueBytesLength(coctx); - int valLen = val.valueBytesLength(coctx); - int entrySize = entrySize(keyLen, valLen); - - if (entrySize >= buf.capacity() - ITEMS_OFF) - throw new IgniteException("Too big entry: " + keyLen + " " + valLen); - - if (!canAddEntry(buf, entrySize)) - return -1; - - int liveCnt = getLiveCount(buf); - int allCnt = getAllCount(buf); - int idx = 0; - - if (allCnt == liveCnt) - idx = allCnt; // Allocate new idx at allCnt if all are alive. - else { - // Lookup for a free parking lot. - while (idx < allCnt) { - if (getDataOffset(buf, idx) == 0) - break; - - idx++; - } - } - - int dataOff = makeWindow(buf, idx, allCnt, entrySize); - - if (dataOff == -1) - return -1; - - // Write data. - writeRowDataInPlace(coctx, buf, dataOff, keyLen + valLen, key, val, ver); - // Write item. - setDataOffset(buf, idx, dataOff); - - // Update header. - setOccupiedSize(buf, getOccupiedSize(buf) + entrySize); - setAllCount(buf, allCnt + 1); - setLiveCount(buf, liveCnt + 1); - - return idx; - } - - /** - * @param buf Buffer. - * @param dataOff Data offset. - * @param key Key. - * @param val Value. - * @param ver Version. - */ - public void writeRowDataInPlace( - CacheObjectContext coctx, - ByteBuffer buf, - int dataOff, - int keyValLen, - CacheObject key, - CacheObject val, - GridCacheVersion ver - ) throws IgniteCheckedException { - try { - buf.position(dataOff); - - buf.putShort((short)keyValLen); - - boolean written = key.putValue(buf, coctx); - - assert written; - - written = val.putValue(buf, coctx); - - assert written; - - buf.putInt(ver.topologyVersion()); - buf.putInt(ver.nodeOrderAndDrIdRaw()); - buf.putLong(ver.globalTime()); - buf.putLong(ver.order()); - } - finally { - buf.position(0); - } - } - - public int getKeyValueSize(ByteBuffer buf, int idx) { - return buf.getShort(getDataOffset(buf, idx)) & 0xFFFF; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataStore.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataStore.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataStore.java new file mode 100644 index 0000000..729ad11 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/DataStore.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.query.h2.database.io.BPlusIO; + +/** + * Data store. + */ +public interface DataStore<T> { + /** + * @param io IO. + * @param buf Buffer. + * @param idx Index. + * @return Data row. + */ + public T getRow(BPlusIO<T> io, ByteBuffer buf, int idx) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java new file mode 100644 index 0000000..8e10ae4 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowStore.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.Page; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.h2.database.io.BPlusIO; +import org.apache.ignite.internal.processors.query.h2.database.io.DataPageIO; +import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO; +import org.apache.ignite.internal.processors.query.h2.database.util.PageHandler; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; + +import static org.apache.ignite.internal.pagemem.PageIdUtils.dwordsOffset; +import static org.apache.ignite.internal.pagemem.PageIdUtils.linkFromDwordOffset; +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; +import static org.apache.ignite.internal.processors.query.h2.database.util.PageHandler.writePage; + +/** + * Data store for H2 rows. + */ +public class H2RowStore implements DataStore<GridH2Row> { + /** */ + private final PageMemory pageMem; + + /** */ + private final GridH2RowDescriptor rowDesc; + + /** */ + private final GridCacheContext<?,?> cctx; + + /** */ + private final CacheObjectContext coctx; + + /** */ + private volatile long lastDataPageId; + + /** */ + private final PageHandler<GridH2Row> writeRow = new PageHandler<GridH2Row>() { + @Override public int run(Page page, ByteBuffer buf, GridH2Row row, int ignore) throws IgniteCheckedException { + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + int idx = io.addRow(coctx, buf, row.key, row.val, row.ver); + + if (idx != -1) { + row.link = linkFromDwordOffset(page.id(), idx); + + assert row.link != 0; + } + + return idx; + } + }; + + /** + * @param pageMem Page memory. + * @param rowDesc Row descriptor. + * @param cctx Cache context. + */ + public H2RowStore(PageMemory pageMem, GridH2RowDescriptor rowDesc, GridCacheContext<?,?> cctx) { + this.pageMem = pageMem; + this.rowDesc = rowDesc; + this.cctx = cctx; + this.coctx = cctx.cacheObjectContext(); + } + + /** {@inheritDoc} */ + @Override public GridH2Row getRow(BPlusIO<GridH2Row> io, ByteBuffer buf, int idx) throws IgniteCheckedException { + long link = ((H2RowLinkIO)io).getLink(buf, idx); + + return getRow(link); + } + + /** + * @param pageId Page ID. + * @return Page. + * @throws IgniteCheckedException If failed. + */ + private Page page(long pageId) throws IgniteCheckedException { + return pageMem.page(new FullPageId(pageId, cctx.cacheId())); + } + + /** + * @param part Partition. + * @return Allocated page. + * @throws IgniteCheckedException if failed. + */ + private Page allocatePage(int part) throws IgniteCheckedException { + FullPageId fullPageId = pageMem.allocatePage(cctx.cacheId(), part, PageIdAllocator.FLAG_DATA); + + return pageMem.page(fullPageId); + } + + /** + * !!! This method must be invoked in read or write lock of referring index page. It is needed to + * !!! make sure that row at this link will be invisible, when the link will be removed from + * !!! from all the index pages, so that row can be safely erased from the data page. + * + * @param link Link. + * @return Row. + */ + private GridH2Row getRow(long link) throws IgniteCheckedException { + try (Page page = page(pageId(link))) { + ByteBuffer buf = page.getForRead(); + + try { + GridH2Row existing = rowDesc.cachedRow(link); + + if (existing != null) + return existing; + + DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + + int dataOff = io.getDataOffset(buf, dwordsOffset(link)); + + buf.position(dataOff); + + // Skip key-value size. + buf.getShort(); + + CacheObject key = coctx.processor().toCacheObject(coctx, buf); + CacheObject val = coctx.processor().toCacheObject(coctx, buf); + + int topVer = buf.getInt(); + int nodeOrderDrId = buf.getInt(); + long globalTime = buf.getLong(); + long order = buf.getLong(); + + GridCacheVersion ver = new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); + + GridH2Row res; + + try { + res = rowDesc.createRow(key, PageIdUtils.partId(link), val, ver, 0); + + res.link = link; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + assert res.ver != null; + + rowDesc.cache(res); + + return res; + } + finally { + page.releaseRead(); + } + } + } + + /** + * @param expLastDataPageId Expected last data page ID. + * @return Next data page ID. + */ + private synchronized long nextDataPage(long expLastDataPageId, int partId) throws IgniteCheckedException { + if (expLastDataPageId != lastDataPageId) + return lastDataPageId; + + long pageId; + + try (Page page = allocatePage(partId)) { + pageId = page.id(); + + ByteBuffer buf = page.getForInitialWrite(); + + DataPageIO.VERSIONS.latest().initNewPage(buf, page.id()); + } + + return lastDataPageId = pageId; + } + + /** + * @param row Row. + */ + public void writeRowData(GridH2Row row) throws IgniteCheckedException { + assert row.link == 0; + + while (row.link == 0) { + long pageId = lastDataPageId; + + if (pageId == 0) + pageId = nextDataPage(0, row.partId); + + try (Page page = page(pageId)) { + if (writePage(page, writeRow, row, -1, -1) >= 0) + return; // Successful write. + } + + nextDataPage(pageId, row.partId); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/PageIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/PageIO.java deleted file mode 100644 index acb89dc..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/PageIO.java +++ /dev/null @@ -1,133 +0,0 @@ -package org.apache.ignite.internal.processors.query.h2.database; - -import java.nio.ByteBuffer; - -/** - * Base format for all the page types. - */ -public abstract class PageIO { - /** */ - private static final int TYPE_OFF = 0; - - /** */ - private static final int VER_OFF = TYPE_OFF + 2; - - /** */ - private static final int PAGE_ID_OFF = VER_OFF + 2; - - /** */ - private static final int CRC_OFF = PAGE_ID_OFF + 8; - - /** */ - public static final int COMMON_HEADER_END = 32; // type(2) + ver(2) + pageId(8) + crc(4) + reserved(16) - - /* All the page types. */ - - /** */ - public static final short T_DATA = 1; - - /** */ - public static final short T_BPLUS_REF3_INNER = 2; - - /** */ - public static final short T_BPLUS_REF3_LEAF = 3; - - /** */ - public static final short T_BPLUS_REF3_META = 4; - - /** - * @return Page type. - */ - public static int getType(ByteBuffer buf) { - return buf.getShort(TYPE_OFF) & 0xFFFF; - } - - /** - * @param buf Buffer. - * @param type Type. - */ - public static void setType(ByteBuffer buf, int type) { - assert type >= T_DATA && type <= T_BPLUS_REF3_META : type; - - buf.putShort(TYPE_OFF, (short)type); - - assert getType(buf) == type; - } - - /** - * @param buf Buffer. - * @return Version. - */ - public static int getVersion(ByteBuffer buf) { - return buf.getShort(VER_OFF) & 0xFFFF; - } - - /** - * @param buf Buffer. - * @param ver Version. - */ - public static void setVersion(ByteBuffer buf, int ver) { - buf.putShort(VER_OFF, (short)ver); - - assert getVersion(buf) == ver; - } - - /** - * @param buf Buffer. - * @return Page ID. - */ - public static long getPageId(ByteBuffer buf) { - return buf.getLong(PAGE_ID_OFF); - } - - /** - * @param buf Buffer. - * @param pageId Page ID. - */ - public static void setPageId(ByteBuffer buf, long pageId) { - buf.putLong(PAGE_ID_OFF, pageId); - - assert getPageId(buf) == pageId; - } - - /** - * @param buf Buffer. - * @return Checksum. - */ - public static int getCrc(ByteBuffer buf) { - return buf.getInt(CRC_OFF); - } - - /** - * @param buf Buffer. - * @param crc Checksum. - */ - public static void setCrc(ByteBuffer buf, int crc) { - buf.putInt(CRC_OFF, crc); - } - - /** - * @return Type. - */ - public abstract int getType(); - - /** - * @return Version. - */ - public abstract int getVersion(); - - /** - * @param buf Buffer. - * @param pageId Page ID. - */ - public void initNewPage(ByteBuffer buf, long pageId) { - setType(buf, getType()); - setVersion(buf, getVersion()); - setPageId(buf, pageId); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return getClass().getSimpleName() + "[ver=" + getVersion() + "]"; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIO.java new file mode 100644 index 0000000..d74079f --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIO.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.query.h2.database.DataStore; + +/** + * Abstract IO routines for B+Tree pages. + */ +public abstract class BPlusIO<T> extends PageIO { + /** */ + protected static final int CNT_OFF = COMMON_HEADER_END; + + /** */ + protected static final int FORWARD_OFF = CNT_OFF + 2; + + /** */ + protected static final int REMOVE_ID_OFF = FORWARD_OFF + 8; + + /** */ + protected static final int ITEMS_OFF = REMOVE_ID_OFF + 8; + + /** + * @param ver Page format version. + */ + protected BPlusIO(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public void initNewPage(ByteBuffer buf, long pageId) { + super.initNewPage(buf, pageId); + + setCount(buf, 0); + setForward(buf, 0); + setRemoveId(buf, 0); + } + + /** + * @param buf Buffer. + * @return Forward page ID. + */ + public long getForward(ByteBuffer buf) { + return buf.getLong(FORWARD_OFF); + } + + /** + * @param buf Buffer. + * @param pageId Forward page ID. + */ + public void setForward(ByteBuffer buf, long pageId) { + buf.putLong(FORWARD_OFF, pageId); + + assert getForward(buf) == pageId; + } + + /** + * @param buf Buffer. + * @return Remove ID. + */ + public long getRemoveId(ByteBuffer buf) { + return buf.getLong(REMOVE_ID_OFF); + } + + /** + * @param buf Buffer. + * @param rmvId Remove ID. + */ + public void setRemoveId(ByteBuffer buf, long rmvId) { + buf.putLong(REMOVE_ID_OFF, rmvId); + + assert getRemoveId(buf) == rmvId; + } + + /** + * @param buf Buffer. + * @return Items count in the page. + */ + public int getCount(ByteBuffer buf) { + int cnt = buf.getShort(CNT_OFF) & 0xFFFF; + + assert cnt >= 0: cnt; + + return cnt; + } + + /** + * @param buf Buffer. + * @param cnt Count. + */ + public void setCount(ByteBuffer buf, int cnt) { + assert cnt >= 0: cnt; + + buf.putShort(CNT_OFF, (short)cnt); + + assert getCount(buf) == cnt; + } + + /** + * @return {@code true} if it is a leaf page. + */ + public abstract boolean isLeaf(); + + /** + * @param buf Buffer. + * @return Max items count. + */ + public abstract int getMaxCount(ByteBuffer buf); + + /** + * Store the needed info about the row in the page. Leaf and inner pages can store different info. + * + * @param buf Buffer. + * @param idx Index. + * @param row Row. + */ + public abstract void store(ByteBuffer buf, int idx, T row); + + /** + * Store row info from the given source. + * + * @param dst Destination buffer + * @param dstIdx Destination index. + * @param srcIo Source IO. + * @param src Source buffer. + * @param srcIdx Source index. + */ + public abstract void store(ByteBuffer dst, int dstIdx, BPlusIO<T> srcIo, ByteBuffer src, int srcIdx); + + /** + * @return {@code true} If we can get the whole row from this page using {@link DataStore}. + * Must always be {@code true} for leaf pages. + */ + public abstract boolean canGetRow(); + + /** + * Copy items from source buffer to destination buffer. Both pages must be of the same type. + * + * @param src Source buffer. + * @param dst Destination buffer. + * @param srcIdx Source begin index. + * @param dstIdx Destination begin index. + * @param cnt Items count. + * @param cpLeft Copy leftmost link (makes sense only for inner pages). + */ + public abstract void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt, boolean cpLeft); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOInner.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOInner.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOInner.java new file mode 100644 index 0000000..32b4bab --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOInner.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * Abstract IO routines for B+Tree inner pages. + */ +public abstract class BPlusIOInner<T> extends BPlusIO<T> { + /** */ + protected static final int ITEM_SIZE = 16; + + /** */ + protected static final int SHIFT_LEFT = ITEMS_OFF; + + /** */ + protected static final int SHIFT_LINK = ITEMS_OFF + 8; + + /** */ + protected static final int SHIFT_RIGHT = ITEMS_OFF + 16; + + /** + * @param ver Page format version. + */ + protected BPlusIOInner(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public int getMaxCount(ByteBuffer buf) { + // (capacity - ITEMS_OFF - RIGHTMOST_PAGE_ID_SLOT_SIZE) / ITEM_SIZE + return (buf.capacity() - ITEMS_OFF - 8) >>> 4; + } + + /** {@inheritDoc} */ + @Override public boolean isLeaf() { + return false; + } + + /** + * @param buf Buffer. + * @param idx Index. + * @return Page ID. + */ + public long getLeft(ByteBuffer buf, int idx) { + return buf.getLong(offset(idx, SHIFT_LEFT)); + } + + /** + * @param buf Buffer. + * @param idx Index. + * @param pageId Page ID. + */ + public void setLeft(ByteBuffer buf, int idx, long pageId) { + buf.putLong(offset(idx, SHIFT_LEFT), pageId); + + assert pageId == getLeft(buf, idx); + } + + /** + * @param buf Buffer. + * @param idx Index. + * @return Page ID. + */ + public long getRight(ByteBuffer buf, int idx) { + return buf.getLong(offset(idx, SHIFT_RIGHT)); + } + + /** + * @param buf Buffer. + * @param idx Index. + * @param pageId Page ID. + */ + public void setRight(ByteBuffer buf, int idx, long pageId) { + buf.putLong(offset(idx, SHIFT_RIGHT), pageId); + + assert pageId == getRight(buf, idx); + } + + /** {@inheritDoc} */ + @Override public void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt, + boolean cpLeft) { + assert srcIdx != dstIdx || src != dst; + + if (dstIdx > srcIdx) { + for (int i = cnt - 1; i >= 0; i--) { + dst.putLong(offset(dstIdx + i, SHIFT_RIGHT), src.getLong(offset(srcIdx + i, SHIFT_RIGHT))); + dst.putLong(offset(dstIdx + i, SHIFT_LINK), src.getLong(offset(srcIdx + i, SHIFT_LINK))); + } + + if (cpLeft) + dst.putLong(offset(dstIdx, SHIFT_LEFT), src.getLong(offset(srcIdx, SHIFT_LEFT))); + } + else { + if (cpLeft) + dst.putLong(offset(dstIdx, SHIFT_LEFT), src.getLong(offset(srcIdx, SHIFT_LEFT))); + + for (int i = 0; i < cnt; i++) { + dst.putLong(offset(dstIdx + i, SHIFT_RIGHT), src.getLong(offset(srcIdx + i, SHIFT_RIGHT))); + dst.putLong(offset(dstIdx + i, SHIFT_LINK), src.getLong(offset(srcIdx + i, SHIFT_LINK))); + } + } + } + + /** + * @param idx Index of element. + * @param shift It can be either link itself or left or right page ID. + * @return Offset from byte buffer begin in bytes. + */ + protected static int offset(int idx, int shift) { + assert idx >= 0: idx; + + return shift + ITEM_SIZE * idx; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOLeaf.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOLeaf.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOLeaf.java new file mode 100644 index 0000000..67889ea --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOLeaf.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * Abstract IO routines for B+Tree leaf pages. + */ +public abstract class BPlusIOLeaf<T> extends BPlusIO<T> { + /** */ + protected static final int ITEM_SIZE = 8; + + /** + * @param ver Page format version. + */ + protected BPlusIOLeaf(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public boolean isLeaf() { + return true; + } + + /** {@inheritDoc} */ + @Override public int getMaxCount(ByteBuffer buf) { + return (buf.capacity() - ITEMS_OFF) >>> 3; // divide by ITEM_SIZE + } + +// /** {@inheritDoc} */ +// @Override public long getLink(ByteBuffer buf, int idx) { +// assert idx < getCount(buf): idx; +// +// return buf.getLong(offset(idx)); +// } +// +// /** {@inheritDoc} */ +// @Override public void setLink(ByteBuffer buf, int idx, long link) { +// buf.putLong(offset(idx), link); +// +// assert getLink(buf, idx) == link; +// } + + /** {@inheritDoc} */ + @Override public void copyItems(ByteBuffer src, ByteBuffer dst, int srcIdx, int dstIdx, int cnt, + boolean cpLeft) { + assert srcIdx != dstIdx || src != dst; + + if (dstIdx > srcIdx) { + for (int i = cnt - 1; i >= 0; i--) + dst.putLong(offset(dstIdx + i), src.getLong(offset(srcIdx + i))); + } + else { + for (int i = 0; i < cnt; i++) + dst.putLong(offset(dstIdx + i), src.getLong(offset(srcIdx + i))); + } + } + + /** + * @param idx Index of item. + * @return Offset. + */ + protected static int offset(int idx) { + assert idx >= 0: idx; + + return ITEMS_OFF + idx * ITEM_SIZE; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOMeta.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOMeta.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOMeta.java new file mode 100644 index 0000000..2255341 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/BPlusIOMeta.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * IO routines for B+Tree meta pages. + */ +public class BPlusIOMeta extends PageIO { + /** */ + public static final IORegistry<BPlusIOMeta> VERSIONS = new IORegistry<>( + new BPlusIOMeta(1) + ); + + /** */ + private static final int LVLS_OFF = COMMON_HEADER_END; + + /** */ + private static final int REFS_OFF = LVLS_OFF + 1; + + /** + * @param ver Page format version. + */ + protected BPlusIOMeta(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public int getType() { + return T_BPLUS_META; + } + + /** {@inheritDoc} */ + @Override public void initNewPage(ByteBuffer buf, long pageId) { + super.initNewPage(buf, pageId); + + setLevelsCount(buf, 0); + } + + /** + * @param buf Buffer. + * @return Number of levels in this tree. + */ + public int getLevelsCount(ByteBuffer buf) { + return buf.get(LVLS_OFF); + } + + /** + * @param buf Buffer. + * @param lvls Number of levels in this tree. + */ + public void setLevelsCount(ByteBuffer buf, int lvls) { + assert lvls >= 0 && lvls < 30; + + buf.put(LVLS_OFF, (byte)lvls); + + assert getLevelsCount(buf) == lvls; + } + + /** + * @param lvl Level. + * @return Offset for page reference. + */ + private static int offset(int lvl) { + return lvl * 8 + REFS_OFF; + } + + /** + * @param buf Buffer. + * @param lvl Level. + * @return Page reference at that level. + */ + public long getLeftmostPageId(ByteBuffer buf, int lvl) { + return buf.getLong(offset(lvl)); + } + + /** + * @param buf Buffer. + * @param lvl Level. + * @param pageId Page ID. + */ + public void setLeftmostPageId(ByteBuffer buf, int lvl, long pageId) { + assert lvl >= 0 && lvl < getLevelsCount(buf); + + buf.putLong(offset(lvl), pageId); + + assert getLeftmostPageId(buf, lvl) == pageId; + } + + /** + * @param buf Buffer. + * @return Root level. + */ + public int getRootLevel(ByteBuffer buf) { + int lvls = getLevelsCount(buf); // The highest level page is root. + + assert lvls > 0 : lvls; + + return lvls - 1; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/DataPageIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/DataPageIO.java new file mode 100644 index 0000000..74b32d8 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/DataPageIO.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; + +/** + * Data pages IO. + */ +public class DataPageIO extends PageIO { + /** */ + public static final IORegistry<DataPageIO> VERSIONS = new IORegistry<>( + new DataPageIO(1) + ); + + /** */ + private static final int OCCUPIED_SIZE_OFF = COMMON_HEADER_END; + + /** */ + private static final int ALL_CNT_OFF = OCCUPIED_SIZE_OFF + 2; + + /** */ + private static final int LIVE_CNT_OFF = ALL_CNT_OFF + 2; + + /** */ + private static final int ITEMS_OFF = LIVE_CNT_OFF + 2; + + /** */ + private static final int ITEM_SIZE = 2; + + /** + * @param ver Page format version. + */ + protected DataPageIO(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public int getType() { + return T_DATA; + } + + /** {@inheritDoc} */ + @Override public void initNewPage(ByteBuffer buf, long pageId) { + super.initNewPage(buf, pageId); + + setAllCount(buf, 0); + setLiveCount(buf, 0); + setOccupiedSize(buf, 0); + } + + public int getOccupiedSize(ByteBuffer buf) { + return buf.getShort(OCCUPIED_SIZE_OFF) & 0xFFFF; + } + + public void setOccupiedSize(ByteBuffer buf, int size) { + buf.putShort(OCCUPIED_SIZE_OFF, (short)size); + + assert getOccupiedSize(buf) == size; + } + + public int getAllCount(ByteBuffer buf) { + return buf.getShort(ALL_CNT_OFF) & 0xFFFF; + } + + public void setAllCount(ByteBuffer buf, int cnt) { + buf.putShort(ALL_CNT_OFF, (short)cnt); + + assert cnt == getAllCount(buf); + } + + public int getLiveCount(ByteBuffer buf) { + return buf.getShort(LIVE_CNT_OFF) & 0xFFFF; + } + + public void setLiveCount(ByteBuffer buf, int cnt) { + buf.putShort(LIVE_CNT_OFF, (short)cnt); + + assert cnt == getLiveCount(buf); + } + + public boolean canAddEntry(ByteBuffer buf, int entrySize) { + int free = buf.capacity() - ITEMS_OFF - getOccupiedSize(buf); + + if (free < entrySize) + return false; + + free -= (getAllCount(buf) - getLiveCount(buf)) * ITEM_SIZE; + + return free >= entrySize; + } + + /** + * @param keySize Key size. + * @param valSize Value size. + * @return Entry size including item. + */ + private static int entrySize(int keySize, int valSize) { + return ITEM_SIZE + 2/*key+val len*/ + keySize + valSize + 24/*ver*/; + } + + /** + * @param idx Index of item. + * @return Offset in bytes. + */ + private static int offset(int idx) { + return ITEMS_OFF + idx * ITEM_SIZE; + } + + /** + * @param buf Buffer. + * @param idx Index of item. + * @return Data offset in bytes. + */ + public int getDataOffset(ByteBuffer buf, int idx) { + return buf.getShort(offset(idx)) & 0xFFFF; + } + + /** + * @param buf Buffer. + * @param idx Index of item. + * @param dataOff Data offset in bytes. + */ + private void setDataOffset(ByteBuffer buf, int idx, int dataOff) { + buf.putShort(offset(idx), (short)dataOff); + + assert dataOff == getDataOffset(buf, idx); + } + + /** + * Make a window for data entry. + * + * @param buf Buffer. + * @param idx Index of the new item. + * @param allCnt All count. + * @param entrySize Entry size. + * @return Data offset for the new entry. + */ + public int makeWindow(ByteBuffer buf, int idx, int allCnt, int entrySize) { + if (idx == allCnt) { // Adding to the end of items. + int off = offset(idx); + int lastDataOff = allCnt == 0 ? buf.capacity() : getDataOffset(buf, allCnt - 1); + + if (lastDataOff - off < entrySize) // TODO try to defragment + return -1; + + return lastDataOff - entrySize + ITEM_SIZE; + } + else { + //TODO defragment page with respect to idx and entrySize (if idx is not last, the window must be not first) + throw new UnsupportedOperationException(); + } + } + + public int addRow( + CacheObjectContext coctx, + ByteBuffer buf, + CacheObject key, + CacheObject val, + GridCacheVersion ver) throws IgniteCheckedException + { + int keyLen = key.valueBytesLength(coctx); + int valLen = val.valueBytesLength(coctx); + int entrySize = entrySize(keyLen, valLen); + + if (entrySize >= buf.capacity() - ITEMS_OFF) + throw new IgniteException("Too big entry: " + keyLen + " " + valLen); + + if (!canAddEntry(buf, entrySize)) + return -1; + + int liveCnt = getLiveCount(buf); + int allCnt = getAllCount(buf); + int idx = 0; + + if (allCnt == liveCnt) + idx = allCnt; // Allocate new idx at allCnt if all are alive. + else { + // Lookup for a free parking lot. + while (idx < allCnt) { + if (getDataOffset(buf, idx) == 0) + break; + + idx++; + } + } + + int dataOff = makeWindow(buf, idx, allCnt, entrySize); + + if (dataOff == -1) + return -1; + + // Write data. + writeRowDataInPlace(coctx, buf, dataOff, keyLen + valLen, key, val, ver); + // Write item. + setDataOffset(buf, idx, dataOff); + + // Update header. + setOccupiedSize(buf, getOccupiedSize(buf) + entrySize); + setAllCount(buf, allCnt + 1); + setLiveCount(buf, liveCnt + 1); + + return idx; + } + + /** + * @param buf Buffer. + * @param dataOff Data offset. + * @param key Key. + * @param val Value. + * @param ver Version. + */ + public void writeRowDataInPlace( + CacheObjectContext coctx, + ByteBuffer buf, + int dataOff, + int keyValLen, + CacheObject key, + CacheObject val, + GridCacheVersion ver + ) throws IgniteCheckedException { + try { + buf.position(dataOff); + + buf.putShort((short)keyValLen); + + boolean written = key.putValue(buf, coctx); + + assert written; + + written = val.putValue(buf, coctx); + + assert written; + + buf.putInt(ver.topologyVersion()); + buf.putInt(ver.nodeOrderAndDrIdRaw()); + buf.putLong(ver.globalTime()); + buf.putLong(ver.order()); + } + finally { + buf.position(0); + } + } + + public int getKeyValueSize(ByteBuffer buf, int idx) { + return buf.getShort(getDataOffset(buf, idx)) & 0xFFFF; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java new file mode 100644 index 0000000..7fac282 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; + +/** + * Inner page for H2 row references. + */ +public class H2InnerIO extends BPlusIOInner<GridH2Row> implements H2RowLinkIO { + /** */ + public static final IORegistry<H2InnerIO> VERSIONS = new IORegistry<>( + new H2InnerIO(1) + ); + + /** + * @param ver Page format version. + */ + private H2InnerIO(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public final int getType() { + return T_H2_REF_INNER; + } + + /** {@inheritDoc} */ + @Override public void store(ByteBuffer buf, int idx, GridH2Row row) { + assert row.link != 0; + + setLink(buf, idx, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<GridH2Row> srcIo, ByteBuffer src, int srcIdx) { + long link = ((H2RowLinkIO)srcIo).getLink(src, srcIdx); + + setLink(dst, dstIdx, link); + } + + /** {@inheritDoc} */ + @Override public boolean canGetRow() { + return true; // We can get row from link. + } + + /** {@inheritDoc} */ + @Override public long getLink(ByteBuffer buf, int idx) { + assert idx < getCount(buf): idx; + + return buf.getLong(offset(idx, SHIFT_LINK)); + } + + /** {@inheritDoc} */ + @Override public void setLink(ByteBuffer buf, int idx, long link) { + buf.putLong(offset(idx, SHIFT_LINK), link); + + assert getLink(buf, idx) == link; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java new file mode 100644 index 0000000..b86b8b1 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; + +/** + * Leaf page for H2 row references. + */ +public class H2LeafIO extends BPlusIOLeaf<GridH2Row> implements H2RowLinkIO { + /** */ + public static final IORegistry<H2LeafIO> VERSIONS = new IORegistry<>( + new H2LeafIO(1) + ); + + /** + * @param ver Page format version. + */ + protected H2LeafIO(int ver) { + super(ver); + } + + /** {@inheritDoc} */ + @Override public int getType() { + return T_H2_REF_LEAF; + } + + /** {@inheritDoc} */ + @Override public void store(ByteBuffer buf, int idx, GridH2Row row) { + assert row.link != 0; + + setLink(buf, idx, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<GridH2Row> srcIo, ByteBuffer src, int srcIdx) { + long link = ((H2RowLinkIO)srcIo).getLink(src, srcIdx); + + setLink(dst, dstIdx, link); + } + + /** {@inheritDoc} */ + @Override public boolean canGetRow() { + return true; // We can get row from link. + } + + /** {@inheritDoc} */ + @Override public long getLink(ByteBuffer buf, int idx) { + assert idx < getCount(buf): idx; + + return buf.getLong(offset(idx)); + } + + /** {@inheritDoc} */ + @Override public void setLink(ByteBuffer buf, int idx, long link) { + buf.putLong(offset(idx), link); + + assert getLink(buf, idx) == link; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java new file mode 100644 index 0000000..93a7ffc --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * Row link IO. + */ +public interface H2RowLinkIO { + /** + * @param buf Buffer. + * @param idx Index. + * @return Row link. + */ + public long getLink(ByteBuffer buf, int idx); + + /** + * @param buf Buffer. + * @param idx Index. + * @param link Row link. + */ + public void setLink(ByteBuffer buf, int idx, long link); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/IORegistry.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/IORegistry.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/IORegistry.java new file mode 100644 index 0000000..81a988e --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/IORegistry.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * Registry for IO versions. + */ +public final class IORegistry<V extends PageIO> { + /** */ + private final V[] vers; + + /** + * @param vers Versions. + */ + @SafeVarargs + public IORegistry(V... vers) { + assert vers != null; + assert vers.length > 0; + + this.vers = vers; + + assert checkVersions(); + } + + /** + * @return {@code true} If versions are correct. + */ + private boolean checkVersions() { + for (int i = 0; i < vers.length; i++) { + if (vers[i].getVersion() != i + 1) + return false; + } + + return true; + } + + /** + * @return Latest IO version. + */ + public V latest() { + return forVersion(vers.length); + } + + /** + * @param ver Version. + * @return IO. + */ + public V forVersion(int ver) { + return vers[ver - 1]; + } + + /** + * @param buf Buffer. + * @return IO. + */ + public V forPage(ByteBuffer buf) { + int ver = PageIO.getVersion(buf); + + V res = forVersion(ver); + + assert res.getType() == PageIO.getType(buf); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/PageIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/PageIO.java new file mode 100644 index 0000000..4c22b8c --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/PageIO.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.io; + +import java.nio.ByteBuffer; + +/** + * Base format for all the page types. + */ +public abstract class PageIO { + /** */ + private static final int TYPE_OFF = 0; + + /** */ + private static final int VER_OFF = TYPE_OFF + 2; + + /** */ + private static final int PAGE_ID_OFF = VER_OFF + 2; + + /** */ + private static final int CRC_OFF = PAGE_ID_OFF + 8; + + /** */ + public static final int COMMON_HEADER_END = 32; // type(2) + ver(2) + pageId(8) + crc(4) + reserved(16) + + /* All the page types. */ + + /** */ + public static final short T_DATA = 1; + + /** */ + public static final short T_BPLUS_META = 2; + + /** */ + public static final short T_H2_REF_LEAF = 3; + + /** */ + public static final short T_H2_REF_INNER = 4; + + /** */ + private final int ver; + + /** + * @param ver Page format version. + */ + protected PageIO(int ver) { + assert ver > 0 && ver < 65535: ver; + + this.ver = ver; + } + + + + /** + * @return Page type. + */ + public static int getType(ByteBuffer buf) { + return buf.getShort(TYPE_OFF) & 0xFFFF; + } + + /** + * @param buf Buffer. + * @param type Type. + */ + public static void setType(ByteBuffer buf, int type) { + assert type >= T_DATA && type <= T_H2_REF_INNER : type; + + buf.putShort(TYPE_OFF, (short)type); + + assert getType(buf) == type; + } + + /** + * @param buf Buffer. + * @return Version. + */ + public static int getVersion(ByteBuffer buf) { + return buf.getShort(VER_OFF) & 0xFFFF; + } + + /** + * @param buf Buffer. + * @param ver Version. + */ + public static void setVersion(ByteBuffer buf, int ver) { + buf.putShort(VER_OFF, (short)ver); + + assert getVersion(buf) == ver; + } + + /** + * @param buf Buffer. + * @return Page ID. + */ + public static long getPageId(ByteBuffer buf) { + return buf.getLong(PAGE_ID_OFF); + } + + /** + * @param buf Buffer. + * @param pageId Page ID. + */ + public static void setPageId(ByteBuffer buf, long pageId) { + buf.putLong(PAGE_ID_OFF, pageId); + + assert getPageId(buf) == pageId; + } + + /** + * @param buf Buffer. + * @return Checksum. + */ + public static int getCrc(ByteBuffer buf) { + return buf.getInt(CRC_OFF); + } + + /** + * @param buf Buffer. + * @param crc Checksum. + */ + public static void setCrc(ByteBuffer buf, int crc) { + buf.putInt(CRC_OFF, crc); + } + + /** + * @return Type. + */ + public abstract int getType(); + + /** + * @return Version. + */ + public final int getVersion() { + return ver; + } + + /** + * @param buf Buffer. + * @param pageId Page ID. + */ + public void initNewPage(ByteBuffer buf, long pageId) { + setType(buf, getType()); + setVersion(buf, getVersion()); + setPageId(buf, pageId); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getSimpleName() + "[ver=" + getVersion() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/CompareUtils.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/CompareUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/CompareUtils.java new file mode 100644 index 0000000..565f566 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/CompareUtils.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.util; + +import java.math.BigDecimal; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.util.GridUnsafe; +import org.h2.util.MathUtils; +import org.h2.value.Value; + +/** + * + */ +public class CompareUtils { + /** */ + private static final int UTF_8_MIN_2_BYTES = 0x80; + + /** */ + private static final int UTF_8_MIN_3_BYTES = 0x800; + + /** */ + private static final int UTF_8_MIN_4_BYTES = 0x10000; + + /** */ + private static final int UTF_8_MAX_CODE_POINT = 0x10ffff; + + /** */ + private static final BigDecimal MAX_LONG_DECIMAL = BigDecimal.valueOf(Long.MAX_VALUE); + + /** */ + private static final BigDecimal MIN_LONG_DECIMAL = BigDecimal.valueOf(Long.MIN_VALUE); + + /** + * @param x Value. + * @return Byte value. + */ + public static byte convertToByte(long x) { + if (x > Byte.MAX_VALUE || x < Byte.MIN_VALUE) + throw new IgniteException("Numeric value out of range: " + x); + + return (byte) x; + } + + /** + * @param x Value. + * @return Short value. + */ + public static short convertToShort(long x) { + if (x > Short.MAX_VALUE || x < Short.MIN_VALUE) + throw new IgniteException("Numeric value out of range: " + x); + + return (short) x; + } + + /** + * @param x Value. + * @return Int value. + */ + public static int convertToInt(long x) { + if (x > Integer.MAX_VALUE || x < Integer.MIN_VALUE) + throw new IgniteException("Numeric value out of range: " + x); + + return (int) x; + } + + /** + * @param x Value. + * @return Long value. + */ + public static long convertToLong(double x) { + if (x > Long.MAX_VALUE || x < Long.MIN_VALUE) + throw new IgniteException("Numeric value out of range: " + x); + + return Math.round(x); + } + + /** + * @param x Value. + * @return Long value. + */ + public static long convertToLong(BigDecimal x) { + if (x.compareTo(MAX_LONG_DECIMAL) > 0 || x.compareTo(MIN_LONG_DECIMAL) < 0) + throw new IgniteException("Numeric value out of range: " + x); + + return x.setScale(0, BigDecimal.ROUND_HALF_UP).longValue(); + } + + /** + * @param v Value1. + * @param val Value2. + * @return Compare result. + */ + public static int compareBoolean(boolean v, Value val) { + boolean v2 = val.getBoolean(); + + return (v == v2) ? 0 : (v ? 1 : -1); + } + + /** + * @param v Value1. + * @param val Value2. + * @return Compare result. + */ + public static int compareByte(byte v, Value val) { + byte v2 = val.getByte(); + + return MathUtils.compareInt(v, v2); + } + + /** + * @param val1Addr First string UTF-8 bytes address. + * @param val1Len Number of bytes in first string. + * @param val2Bytes Second string bytes. + * @param val2Off Second string offset. + * @param val2Len Number of bytes in second string. + * @return Compare result. + * @throws IgniteCheckedException In case of error. + */ + public static int compareUtf8(long val1Addr, + int val1Len, + byte[] val2Bytes, + int val2Off, + int val2Len) throws IgniteCheckedException { + int len = Math.min(val1Len, val2Len); + + for (int i = 0; i < len; i++) { + int b1 = GridUnsafe.getByte(val1Addr + i) & 0xFF; + int b2 = val2Bytes[val2Off + i] & 0xFF; + + if (b1 != b2) + return b1 > b2 ? 1 : -1; + } + + return Integer.compare(val1Len, val2Len); + } + + /** + * @param addr UTF-8 bytes address. + * @param len Number of bytes to decode. + * @param str String to compare with. + * @return Compare result. + * @throws IgniteCheckedException In case of error. + */ + public static int compareUtf8(long addr, int len, String str) throws IgniteCheckedException { + int pos = 0; + + int cntr = 0; + + int strLen = str.length(); + + // ASCII only optimized loop. + while (pos < len) { + byte ch = GridUnsafe.getByte(addr + pos); + + if (ch >= 0) { + char c0 = (char)ch; + + if (cntr < strLen) { + char c1 = str.charAt(cntr); + + if (c0 != c1) + return c0 - c1; + } + else + return 1; + + cntr++; + + pos++; + } + else + break; + } + + // TODO: check index bounds. + + while (pos < len) { + int ch = GridUnsafe.getByte(addr + pos++) & 0xff; + + // Convert UTF-8 to 21-bit codepoint. + if (ch < 0x80) { + // 0xxxxxxx -- length 1. + } + else if (ch < 0xc0) { + // 10xxxxxx -- illegal! + throw new IgniteException("Illegal UTF-8 sequence."); + } + else if (ch < 0xe0) { + // 110xxxxx 10xxxxxx + ch = ((ch & 0x1f) << 6); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); + + checkMinimal(ch, UTF_8_MIN_2_BYTES); + } + else if (ch < 0xf0) { + // 1110xxxx 10xxxxxx 10xxxxxx + ch = ((ch & 0x0f) << 12); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 6); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); + + checkMinimal(ch, UTF_8_MIN_3_BYTES); + } + else if (ch < 0xf8) { + // 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + ch = ((ch & 0x07) << 18); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 12); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= ((GridUnsafe.getByte(addr + pos++) & 0x3f) << 6); + + checkByte(GridUnsafe.getByte(addr + pos)); + + ch |= (GridUnsafe.getByte(addr + pos++) & 0x3f); + + checkMinimal(ch, UTF_8_MIN_4_BYTES); + } + else + throw new IgniteException("Illegal UTF-8 sequence."); + + if (ch > UTF_8_MAX_CODE_POINT) + throw new IgniteException("Illegal UTF-8 sequence."); + + // Convert 21-bit codepoint to Java chars: + // 0..ffff are represented directly as a single char + // 10000..10ffff are represented as a "surrogate pair" of two chars + if (ch > 0xffff) { + // Use a surrogate pair to represent it. + ch -= 0x10000; // ch is now 0..fffff (20 bits) + + char c0 = (char)(0xd800 + (ch >> 10)); // Top 10 bits. + + if (cntr < strLen) { + char c1 = str.charAt(cntr); + + if (c0 != c1) + return c0 - c1; + } + else + return 1; + + cntr++; + + c0 = (char)(0xdc00 + (ch & 0x3ff)); // Bottom 10 bits. + + if (cntr < strLen) { + char c1 = str.charAt(cntr); + + if (c0 != c1) + return c0 - c1; + } + else + return 1; + + cntr++; + } + else if (ch >= 0xd800 && ch < 0xe000) + // Not allowed to encode the surrogate range directly. + throw new IgniteException("Illegal UTF-8 sequence."); + else { + // Normal case. + char c0 = (char)ch; + + if (cntr < strLen) { + char c1 = str.charAt(cntr); + + if (c0 != c1) + return c0 - c1; + } + else + return 1; + + cntr++; + } + } + + // Check if we ran past the end without seeing an exception. + if (pos > len) + throw new IgniteException("Illegal UTF-8 sequence."); + + return cntr - strLen; + } + + /** + * @param ch UTF-8 byte. + * @throws IgniteException In case of error. + */ + private static void checkByte(int ch) { + if ((ch & 0xc0) != 0x80) + throw new IgniteException("Illegal UTF-8 sequence."); + } + + /** + * @param ch UTF-8 byte. + * @param minVal Minimum value. + * @throws IgniteException In case of error. + */ + private static void checkMinimal(int ch, int minVal) { + if (ch >= minVal) + return; + + throw new IgniteException("Illegal UTF-8 sequence."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/420cabac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/PageHandler.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/PageHandler.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/PageHandler.java new file mode 100644 index 0000000..8ec3d1e --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/util/PageHandler.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.database.util; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.Page; + +/** + * Page handler. Can do {@link #readPage(Page, PageHandler, Object, int, int)} + * and {@link #writePage(Page, PageHandler, Object, int, int)} operations. + */ +public abstract class PageHandler<X> { + /** + * @param page Page. + * @param buf Page buffer. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + public abstract int run(Page page, ByteBuffer buf, X arg, int intArg) throws IgniteCheckedException; + + /** + * @param page Page. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @return {@code true} If release. + */ + public boolean releaseAfterWrite(Page page, X arg, int intArg) { + return true; + } + + /** + * @param page Page. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param dfltRes Default result in case of page invalidation. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + public static <X> int readPage(Page page, PageHandler<X> h, X arg, int intArg, int dfltRes) + throws IgniteCheckedException { + if (page == null) + return dfltRes; + + ByteBuffer buf = page.getForRead(); + + if (buf == null) + return dfltRes; + + try { + return h.run(page, buf, arg, intArg); + } + finally { + page.releaseRead(); + } + } + + /** + * @param page Page. + * @param h Handler. + * @param arg Argument. + * @param intArg Argument of type {@code int}. + * @param dfltRes Default result in case of page invalidation. + * @return Handler result. + * @throws IgniteCheckedException If failed. + */ + public static <X> int writePage(Page page, PageHandler<X> h, X arg, int intArg, int dfltRes) + throws IgniteCheckedException { + if (page == null) + return dfltRes; + + int res; + + boolean ok = false; + + ByteBuffer buf = page.getForWrite(); + + if (buf == null) + return dfltRes; + + try { + res = h.run(page, buf, arg, intArg); + + ok = true; + } + finally { + if (h.releaseAfterWrite(page, arg, intArg)) + page.releaseWrite(ok); + } + + return res; + } +}
