This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 58913f683a2e0876aaa54e903babfd4e26166ad8 Author: Wail Alkowaileet <[email protected]> AuthorDate: Fri Aug 16 17:09:34 2024 +0300 [ASTERIXDB-3484] Support storing large values in metadata pages - user model changes: no - storage format changes: yes - interface changes: yes Details: Add the support to store large key-value pairs in the component's metadata page Ext-ref: MB-62875 Change-Id: Ie3f5cb6820b98ef78841c49b13ae33436c71a99a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18633 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../column/large-schema/large-schema.001.ddl.sqlpp | 27 ++ .../large-schema/large-schema.002.update.sqlpp | 31 +++ .../large-schema/large-schema.003.query.sqlpp | 24 ++ .../large-schema/large-schema.004.query.sqlpp | 25 ++ .../column/large-schema/large-schema.003.adm | 1 + .../column/large-schema/large-schema.004.adm | 1 + .../src/test/resources/runtimets/sqlpp_queries.xml | 5 + .../api/compression/ICompressorDecompressor.java | 47 +++- .../am/common/api/IMetadataPageManager.java | 18 +- .../am/common/api/ITreeIndexMetadataFrame.java | 22 +- .../am/common/frames/LIFOMetaDataFrame.java | 14 +- .../AppendOnlyLinkedMetadataPageManager.java | 305 ++++++++++----------- .../common/freepage/LinkedMetaDataPageManager.java | 24 +- .../column/utils/ColumnMetadataReaderWriter.java | 196 +++++++++++++ .../am/lsm/btree/column/utils/ColumnUtil.java | 20 +- .../am/lsm/common/api/IComponentMetadata.java | 20 +- .../am/lsm/common/impls/DiskComponentMetadata.java | 14 +- .../common/impls/EmptyDiskComponentMetadata.java | 2 +- .../lsm/common/impls/MemoryComponentMetadata.java | 20 +- .../compression/NoOpCompressorDecompressor.java | 17 +- .../compression/SnappyCompressorDecompressor.java | 23 +- 21 files changed, 614 insertions(+), 242 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp new file mode 100644 index 0000000000..71a618b0f3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.001.ddl.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test if exists; +CREATE DATAVERSE test; +USE test; + +CREATE DATASET ColumnDataset +PRIMARY KEY (id: int) WITH { + "storage-format": {"format" : "column"} +}; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp new file mode 100644 index 0000000000..5adbda4432 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.002.update.sqlpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + +USE test; + +-- This will add 5000 columns +UPSERT INTO ColumnDataset ( + SELECT VALUE object_add_fields({"id": x}, + [{"field-name": "myBadLongGeneratedFieldName" || to_string(x), "field-value":x}]) + FROM RANGE(1, 5000) x +) + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp new file mode 100644 index 0000000000..c7fd8b5e2b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.003.query.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + +-- This will read a large schema (chunked into two pieces given that the page size is 32KB) +SELECT VALUE COUNT(*) +FROM ColumnDataset c diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp new file mode 100644 index 0000000000..cda8ed1448 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/column/large-schema/large-schema.004.query.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + +-- Ensure values can be projected with large schemas +SELECT VALUE c.myBadLongGeneratedFieldName751 +FROM ColumnDataset c +WHERE c.myBadLongGeneratedFieldName751 IS NOT UNKNOWN diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm new file mode 100644 index 0000000000..e9c02dad18 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.003.adm @@ -0,0 +1 @@ +5000 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm new file mode 100644 index 0000000000..29988c8020 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/column/large-schema/large-schema.004.adm @@ -0,0 +1 @@ +751 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml index 2aba75859b..509d90b2d9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/sqlpp_queries.xml @@ -16598,6 +16598,11 @@ <source-location>false</source-location> </compilation-unit> </test-case> + <test-case FilePath="column"> + <compilation-unit name="large-schema"> + <output-dir compare="Text">large-schema</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="column"> <compilation-unit name="validation"> <output-dir compare="Text">validation</output-dir> diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java index 3a7e901c14..71298976cc 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java @@ -24,41 +24,60 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; /** * An API for block compressor/decompressor. - * + * <p> * Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe. */ public interface ICompressorDecompressor { /** * Computes the required buffer size for <i>compress()</i>. * - * @param uBufferSize - * The size of the uncompressed buffer. + * @param uBufferSize The size of the uncompressed buffer. * @return The required buffer size for compression */ int computeCompressedBufferSize(int uBufferSize); + /** + * Compress <i>src</i> into <i>dest</i> + * + * @param src Uncompressed source buffer + * @param srcOffset Source offset + * @param srcLen Source length + * @param dest Destination buffer + * @param destOffset Destination offset + * @return compressed length + */ + int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException; + /** * Compress <i>uBuffer</i> into <i>cBuffer</i> * - * @param uBuffer - * Uncompressed source buffer - * @param cBuffer - * Compressed destination buffer + * @param uBuffer Uncompressed source buffer + * @param cBuffer Compressed destination buffer * @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size - * @throws HyracksDataException */ ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException; + /** + * Uncompress <i>src</i> into <i>dest</i> + * + * @param src Compressed source + * @param srcOffset Source offset + * @param srcLen Source length + * @param dest Destination buffer + * @param destOffset Destination offset + * @return uncompressed length + * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient. + */ + int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) throws HyracksDataException; + /** * Uncompress <i>cBuffer</i> into <i>uBuffer</i> * - * @param cBuffer - * Compressed source buffer - * @param uBuffer - * Uncompressed destination buffer + * @param cBuffer Compressed source buffer + * @param uBuffer Uncompressed destination buffer * @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size - * @throws HyracksDataException - * An exception will be thrown if the <i>uBuffer</i> size is not sufficient. + * @throws HyracksDataException An exception will be thrown if the <i>uBuffer</i> size is not sufficient. */ ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException; + } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java index 58c837ba03..7909ed1519 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IMetadataPageManager.java @@ -29,6 +29,7 @@ import org.apache.hyracks.data.std.api.IValueReference; public interface IMetadataPageManager extends IPageManager { /** * put the key value pair in the metadata page using the passed frame + * * @param frame * @param key * @param value @@ -38,19 +39,22 @@ public interface IMetadataPageManager extends IPageManager { /** * get the value of the key from the metadata page using the passed frame + * * @param frame * @param key * @param value + * @return true if the key exists, false otherwise * @throws HyracksDataException */ - void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException; + boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException; /** - * @param frame - * @param key - * @return The byte offset in the index file for the entry with the passed key if the index is valid and the key - * exists, returns -1 otherwise. use the passed frame to read the metadata page - * @throws HyracksDataException + * @return page size + */ + int getPageSize(); + + /** + * @return free space of the current page */ - long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException; + int getFreeSpace() throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java index 7efc4696e4..a898d59bc3 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITreeIndexMetadataFrame.java @@ -36,6 +36,7 @@ public interface ITreeIndexMetadataFrame { /** * Set the page in the frame + * * @param page */ void setPage(ICachedPage page); @@ -53,6 +54,7 @@ public interface ITreeIndexMetadataFrame { /** * Set the page level + * * @param level */ void setLevel(byte level); @@ -60,12 +62,14 @@ public interface ITreeIndexMetadataFrame { /** * Get the next metadata page if this page is linked to other metadata pages * Return a negative value otherwise + * * @return */ int getNextMetadataPage(); /** * Link this metadata page to another one + * * @param nextPage */ void setNextMetadataPage(int nextPage); @@ -77,37 +81,44 @@ public interface ITreeIndexMetadataFrame { /** * Set the max page of the file + * * @param maxPage */ void setMaxPage(int maxPage); /** * Get a free page from the page + * * @return */ int getFreePage(); /** * Get the remaining space in the metadata page + * * @return */ int getSpace(); /** * add a new free page to the metadata page + * * @param freePage */ void addFreePage(int freePage); /** * get the value with the key = key + * * @param key * @param value + * @return true if the key exists, false otherwise */ - void get(IValueReference key, IPointable value); + boolean get(IValueReference key, IPointable value); /** * set the value with the key = key + * * @param key * @param value * @throws HyracksDataException @@ -121,18 +132,21 @@ public interface ITreeIndexMetadataFrame { /** * Sets the index to be valid in the metadata page + * * @param valid */ void setValid(boolean valid); /** * Get the storage version associated with this index + * * @return */ int getVersion(); /** * Set the index root page id + * * @param rootPage */ void setRootPageId(int rootPage); @@ -149,6 +163,7 @@ public interface ITreeIndexMetadataFrame { /** * return the offset to the entry of the passed key, -1, otherwise + * * @param key */ int getOffset(IValueReference key); @@ -162,4 +177,9 @@ public interface ITreeIndexMetadataFrame { * @return true if the inspected page is a free page, false otherwise */ boolean isFreePage(); + + /** + * @return the overhead (in bytes) to store a key-value pair + */ + int getKeyValueStorageOverhead(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java index d0757c8ada..62251d95f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java @@ -36,7 +36,6 @@ import org.apache.hyracks.storage.common.buffercache.ICachedPage; * .... * .... * [free page 5][free page 4][free page 3][free page 2][free page 1] - * */ public class LIFOMetaDataFrame implements ITreeIndexMetadataFrame { @@ -167,17 +166,22 @@ public class LIFOMetaDataFrame implements ITreeIndexMetadataFrame { } @Override - public void get(IValueReference key, IPointable value) { + public int getKeyValueStorageOverhead() { + return Integer.BYTES * 2; + } + + @Override + public boolean get(IValueReference key, IPointable value) { int tupleCount = getTupleCount(); int tupleStart = getTupleStart(0); for (int i = 0; i < tupleCount; i++) { if (isInner(key, tupleStart)) { get(tupleStart + key.getLength() + Integer.BYTES, value); - return; + return true; } tupleStart = getNextTupleStart(tupleStart); } - value.set(null, 0, 0); + return false; } private int find(IValueReference key) { @@ -197,7 +201,7 @@ public class LIFOMetaDataFrame implements ITreeIndexMetadataFrame { value.set(buf.array(), offset + Integer.BYTES, valueLength); } - private static final int compare(byte[] b1, int s1, byte[] b2, int s2, int l) { + private static int compare(byte[] b1, int s1, byte[] b2, int s2, int l) { for (int i = 0; i < l; i++) { if (b1[s1 + i] != b2[s2 + i]) { return b1[s1 + i] - b2[s2 + i]; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java index 852c8b5dce..dae01bfe88 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java @@ -18,6 +18,9 @@ */ package org.apache.hyracks.storage.am.common.freepage; +import java.util.ArrayList; +import java.util.List; + import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; @@ -40,108 +43,36 @@ import org.apache.hyracks.storage.common.file.BufferedFileHandle; public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager { private final IBufferCache bufferCache; + private final ITreeIndexMetadataFrameFactory frameFactory; + private final List<ICachedPage> metadataPages; private int metadataPage = IBufferCache.INVALID_PAGEID; private int fileId = -1; - private final ITreeIndexMetadataFrameFactory frameFactory; - private ICachedPage confiscatedPage; + private ICachedPage currentPage; + private ICachedPage firstPage; private boolean ready = false; public AppendOnlyLinkedMetadataPageManager(IBufferCache bufferCache, ITreeIndexMetadataFrameFactory frameFactory) { this.bufferCache = bufferCache; this.frameFactory = frameFactory; + metadataPages = new ArrayList<>(); } @Override - public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) throws HyracksDataException { - ICachedPage metaPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId())); - metaPage.acquireWriteLatch(); - try { - metaFrame.setPage(metaPage); - if (metaFrame.getSpace() > Integer.BYTES) { - metaFrame.addFreePage(freePageNum); - } else { - int newPageNum = metaFrame.getFreePage(); - if (newPageNum < 0) { - throw new HyracksDataException( - "Inconsistent Meta Page State. It has no space, but it also has no entries."); - } - ICachedPage newNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, newPageNum)); - newNode.acquireWriteLatch(); - try { - int metaMaxPage = metaFrame.getMaxPage(); - System.arraycopy(metaPage.getBuffer().array(), 0, newNode.getBuffer().array(), 0, - metaPage.getBuffer().capacity()); - metaFrame.init(); - metaFrame.setNextMetadataPage(newPageNum); - metaFrame.setMaxPage(metaMaxPage); - metaFrame.addFreePage(freePageNum); - } finally { - newNode.releaseWriteLatch(true); - bufferCache.unpin(newNode); - } - } - } finally { - metaPage.releaseWriteLatch(true); - bufferCache.unpin(metaPage); - } + public void releasePage(ITreeIndexMetadataFrame metaFrame, int freePageNum) { + throw new IllegalAccessError("On-disk pages must be immutable"); } @Override - public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count) - throws HyracksDataException { - for (int i = 0; i < count; i++) { - releasePage(metaFrame, startingPage + i); - } + public void releaseBlock(ITreeIndexMetadataFrame metaFrame, int startingPage, int count) { + throw new IllegalAccessError("On-disk pages must be immutable"); } @Override public int takePage(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException { - confiscatedPage.acquireWriteLatch(); - int freePage = IBufferCache.INVALID_PAGEID; - try { - metaFrame.setPage(confiscatedPage); - freePage = metaFrame.getFreePage(); - if (freePage < 0) { // no free page entry on this page - int nextPage = metaFrame.getNextMetadataPage(); - if (nextPage > 0) { // sibling may have free pages - ICachedPage nextNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextPage)); - nextNode.acquireWriteLatch(); - // we copy over the free space entries of nextpage into the - // first meta page (metaDataPage) - // we need to link the first page properly to the next page - // of nextpage - try { - // remember entries that remain unchanged - int maxPage = metaFrame.getMaxPage(); - // copy entire page (including sibling pointer, free - // page entries, and all other info) - // after this copy nextPage is considered a free page - System.arraycopy(nextNode.getBuffer().array(), 0, confiscatedPage.getBuffer().array(), 0, - nextNode.getBuffer().capacity()); - // reset unchanged entry - metaFrame.setMaxPage(maxPage); - freePage = metaFrame.getFreePage(); - // sibling also has no free pages, this "should" not - // happen, but we deal with it anyway just to be safe - if (freePage < 0) { - freePage = nextPage; - } else { - metaFrame.addFreePage(nextPage); - } - } finally { - nextNode.releaseWriteLatch(true); - bufferCache.unpin(nextNode); - } - } else { - freePage = metaFrame.getMaxPage(); - freePage++; - metaFrame.setMaxPage(freePage); - } - } - } finally { - confiscatedPage.releaseWriteLatch(false); - } - return freePage; + metaFrame.setPage(firstPage); + int maxPage = metaFrame.getMaxPage() + 1; + metaFrame.setMaxPage(maxPage); + return maxPage; } @Override @@ -154,23 +85,22 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager @Override public int getMaxPageId(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException { ICachedPage metaNode; - if (confiscatedPage == null) { + if (firstPage == null) { int mdPage = getMetadataPageId(); if (mdPage < 0) { return IBufferCache.INVALID_PAGEID; } metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, mdPage)); } else { - metaNode = confiscatedPage; + metaNode = firstPage; } - metaNode.acquireReadLatch(); + int maxPage = -1; try { metaFrame.setPage(metaNode); maxPage = metaFrame.getMaxPage(); } finally { - metaNode.releaseReadLatch(); - if (confiscatedPage == null) { + if (firstPage == null) { bufferCache.unpin(metaNode); } } @@ -195,48 +125,31 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager int pages = bufferCache.getNumPagesOfFile(fileId); //if there are no pages in the file yet, we're just initializing if (pages == 0) { - if (confiscatedPage != null) { + if (firstPage != null) { throw new HyracksDataException("Metadata Page Manager is already initialized"); } ITreeIndexMetadataFrame metaFrame = createMetadataFrame(); - ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID); - try { - metaFrame.setPage(metaNode); - metaFrame.init(); - metaFrame.setMaxPage(-1); - } finally { - confiscatedPage = metaNode; - } + // First to confiscate + confiscateNext(metaFrame); + firstPage = currentPage; + metaFrame.setMaxPage(-1); } } @Override public void close(IPageWriteFailureCallback failureCallback) throws HyracksDataException { if (ready) { - IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback, - DefaultBufferCacheWriteContext.INSTANCE); - ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame(); - confiscatedPage.acquireWriteLatch(); - try { - metaFrame.setPage(confiscatedPage); - metaFrame.setValid(true); - } finally { - confiscatedPage.releaseWriteLatch(false); - } - int finalMetaPage = getMaxPageId(metaFrame) + 1; - confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage)); - final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId); - compressedPageWriter.prepareWrite(confiscatedPage); - // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page - // won't be flushed to disk because it won't be dirty until the write latch has been released. - pageWriter.write(confiscatedPage); - compressedPageWriter.endWriting(); + persist(failureCallback); metadataPage = getMetadataPageId(); ready = false; - } else if (confiscatedPage != null) { - bufferCache.returnPage(confiscatedPage, false); + } else if (!metadataPages.isEmpty()) { + for (ICachedPage page : metadataPages) { + bufferCache.returnPage(page, false); + } } - confiscatedPage = null; + currentPage = null; + firstPage = null; + metadataPages.clear(); } /** @@ -270,32 +183,26 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager @Override public void setRootPageId(int rootPage) throws HyracksDataException { ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame(); - confiscatedPage.acquireWriteLatch(); - try { - metaFrame.setPage(confiscatedPage); - metaFrame.setRootPageId(rootPage); - } finally { - confiscatedPage.releaseWriteLatch(false); - } + metaFrame.setPage(firstPage); + metaFrame.setRootPageId(rootPage); ready = true; } @Override public int getRootPageId() throws HyracksDataException { ICachedPage metaNode; - if (confiscatedPage == null) { + if (firstPage == null) { metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId())); } else { - metaNode = confiscatedPage; + metaNode = firstPage; } ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame(); - metaNode.acquireReadLatch(); + try { metaFrame.setPage(metaNode); return metaFrame.getRootPageId(); } finally { - metaNode.releaseReadLatch(); - if (confiscatedPage == null) { + if (firstPage == null) { bufferCache.unpin(metaNode); } } @@ -309,58 +216,122 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager @Override public void put(ITreeIndexMetadataFrame frame, IValueReference key, IValueReference value) throws HyracksDataException { - if (confiscatedPage == null) { + if (currentPage == null) { throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT); } - confiscatedPage.acquireWriteLatch(); - try { - frame.setPage(confiscatedPage); - frame.put(key, value); - } finally { - confiscatedPage.releaseWriteLatch(false); + + frame.setPage(currentPage); + + if (frame.getSpace() < key.getLength() + value.getLength() + frame.getKeyValueStorageOverhead()) { + // If there's no space, confiscate an extra page + confiscateNext(frame); + } + + frame.put(key, value); + if (frame.getSpace() == 0) { + /* + * Most likely a user is writing chunks, confiscate a new page so the next call to + * getFreeSpace() will not return 0. + */ + confiscateNext(frame); + } + } + + @Override + public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) + throws HyracksDataException { + int nextPage = getNextPageId(frame, -1); + while (nextPage != -1) { + ICachedPage page = pinPage(nextPage); + try { + frame.setPage(page); + if (frame.get(key, value)) { + return true; + } + nextPage = getNextPageId(frame, nextPage); + } finally { + unpinPage(page); + } + } + + // To preserve the old behavior + value.set(null, 0, 0); + return false; + } + + @Override + public int getPageSize() { + return bufferCache.getPageSize(); + } + + @Override + public int getFreeSpace() throws HyracksDataException { + if (currentPage == null) { + throw HyracksDataException.create(ErrorCode.ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT); } + ITreeIndexMetadataFrame frame = createMetadataFrame(); + frame.setPage(currentPage); + return frame.getSpace() - frame.getKeyValueStorageOverhead(); } - private ICachedPage pinPage() throws HyracksDataException { - return confiscatedPage == null ? bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId())) - : confiscatedPage; + private int getNextPageId(ITreeIndexMetadataFrame frame, int previousPageIdx) throws HyracksDataException { + if (metadataPages.isEmpty()) { + // Read-only (immutable) + return previousPageIdx == -1 ? getMetadataPageId() : frame.getNextMetadataPage(); + } + + // Write (still mutable) + int nextPageIdx = previousPageIdx + 1; + return nextPageIdx < metadataPages.size() ? nextPageIdx : -1; + } + + private ICachedPage pinPage(int pageId) throws HyracksDataException { + if (!metadataPages.isEmpty()) { + return metadataPages.get(pageId); + } + + return bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, pageId)); } private void unpinPage(ICachedPage page) throws HyracksDataException { - if (confiscatedPage == null) { + if (metadataPages.isEmpty()) { bufferCache.unpin(page); } } - @Override - public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException { - ICachedPage page = pinPage(); - page.acquireReadLatch(); + private void confiscateNext(ITreeIndexMetadataFrame metaFrame) throws HyracksDataException { + ICachedPage metaNode = bufferCache.confiscatePage(BufferCache.INVALID_DPID); try { - frame.setPage(page); - frame.get(key, value); + metaFrame.setPage(metaNode); + metaFrame.init(); } finally { - page.releaseReadLatch(); - unpinPage(page); + metadataPages.add(metaNode); + currentPage = metaNode; } } - @Override - public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException { - int pageId = getMetadataPageId(); - if (pageId != IBufferCache.INVALID_PAGEID) { - ICachedPage page = pinPage(); - page.acquireReadLatch(); - try { - frame.setPage(page); - int inPageOffset = frame.getOffset(key); - return inPageOffset >= 0 ? ((long) pageId * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key) - + IBufferCache.RESERVED_HEADER_BYTES : -1L; - } finally { - page.releaseReadLatch(); - unpinPage(page); - } + private void persist(IPageWriteFailureCallback failureCallback) throws HyracksDataException { + IFIFOPageWriter pageWriter = bufferCache.createFIFOWriter(NoOpPageWriteCallback.INSTANCE, failureCallback, + DefaultBufferCacheWriteContext.INSTANCE); + ITreeIndexMetadataFrame metaFrame = frameFactory.createFrame(); + // Last page will have nextPage as -1 + int nextPage = -1; + int pageId = getMaxPageId(metaFrame) + 1; + final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId); + + // Write pages in reverse order (first confiscated page will be the last one to be written) + for (int i = metadataPages.size() - 1; i >= 0; i--) { + ICachedPage page = metadataPages.get(i); + metaFrame.setPage(page); + metaFrame.setNextMetadataPage(nextPage); + // The validity bit matters in the last written page only. No harm for setting this flag for all pages. + metaFrame.setValid(true); + + page.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, pageId)); + compressedPageWriter.prepareWrite(page); + pageWriter.write(page); + nextPage = pageId++; } - return -1L; + compressedPageWriter.endWriting(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java index edc6d10221..fbb6b5fe74 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java @@ -319,24 +319,18 @@ public class LinkedMetaDataPageManager implements IMetadataPageManager { } @Override - public void get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) throws HyracksDataException { + public boolean get(ITreeIndexMetadataFrame frame, IValueReference key, IPointable value) + throws HyracksDataException { throw new HyracksDataException("Unsupported Operation"); } @Override - public long getFileOffset(ITreeIndexMetadataFrame frame, IValueReference key) throws HyracksDataException { - int metadataPageNum = getMetadataPageId(); - if (metadataPageNum != IBufferCache.INVALID_PAGEID) { - ICachedPage metaNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, getMetadataPageId())); - metaNode.acquireReadLatch(); - try { - frame.setPage(metaNode); - return ((long) metadataPageNum * bufferCache.getPageSizeWithHeader()) + frame.getOffset(key); - } finally { - metaNode.releaseReadLatch(); - bufferCache.unpin(metaNode); - } - } - return -1; + public int getPageSize() { + return bufferCache.getPageSize(); + } + + @Override + public int getFreeSpace() throws HyracksDataException { + throw new HyracksDataException("Unsupported Operation"); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java new file mode 100644 index 0000000000..39a6dd276e --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnMetadataReaderWriter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.column.utils; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager; +import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata; +import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree; +import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; +import org.apache.hyracks.storage.common.compression.SnappyCompressorDecompressor; +import org.apache.hyracks.util.annotations.ThreadSafe; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * A Reader/Writer for {@link IColumnMetadata} + */ +@ThreadSafe +final class ColumnMetadataReaderWriter { + private static final Logger LOGGER = LogManager.getLogger(); + /** + * The header consists of two integers: [originalLength | compressedLength] + */ + private static final int CHUNK_HEADER_SIZE = Integer.BYTES * 2; + /** + * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)} + * + * @see LSMColumnBTree#activate() + * @see IColumnManager#activate(IValueReference) + */ + private static final MutableArrayValueReference COLUMNS_METADATA_KEY = + new MutableArrayValueReference("COLUMNS_METADATA".getBytes()); + + /** + * The default (and only) compressor today is 'snappy'. In the future, this could be changed. + * Old indexes should still use snappy. But new indexes can take whatever {@link ICompressorDecompressor} passed + * to it. + */ + private final ICompressorDecompressor compressorDecompressor; + + /** + * This is currently {@link ThreadSafe} since {@link SnappyCompressorDecompressor#INSTANCE} is thread safe. If the + * {@link ICompressorDecompressor} is modified or changed, the modifier should ensure that either the new + * {@link ICompressorDecompressor} is thread safe or the users of this class should create their own instances. + */ + public ColumnMetadataReaderWriter() { + compressorDecompressor = SnappyCompressorDecompressor.INSTANCE; + } + + /** + * Writes the metadata. If the metadata is 'large', then it will be compressed and stored in chunks + * + * @param metadata to write + * @param componentMetadata to store the metadata at + */ + public void writeMetadata(IValueReference metadata, IComponentMetadata componentMetadata) + throws HyracksDataException { + int requiredLength = COLUMNS_METADATA_KEY.getLength() + metadata.getLength(); + if (componentMetadata.getAvailableSpace() >= requiredLength) { + componentMetadata.put(COLUMNS_METADATA_KEY, metadata); + } else { + LOGGER.debug("Writing large column metadata of size {} bytes", requiredLength); + writeChunks(metadata, componentMetadata); + } + } + + /** + * Read the metadata. If the metadata is chunked, it will be assembled back to its original form + * + * @param componentMetadata source + * @return read metadata + */ + public IValueReference readMetadata(IComponentMetadata componentMetadata) throws HyracksDataException { + ArrayBackedValueStorage storage = new ArrayBackedValueStorage(); + storage.reset(); + + if (!componentMetadata.get(COLUMNS_METADATA_KEY, storage)) { + readChunks(componentMetadata, storage); + } + + return storage; + } + + private void writeChunks(IValueReference metadata, IComponentMetadata componentMetadata) + throws HyracksDataException { + ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES); + int originalLength = metadata.getLength(); + + int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength); + ArrayBackedValueStorage compressed = new ArrayBackedValueStorage(requiredSize + CHUNK_HEADER_SIZE); + + // Write the compressed content after CHUNK_HEADER_SIZE + int compressedLength = compressorDecompressor.compress(metadata.getByteArray(), 0, originalLength, + compressed.getByteArray(), CHUNK_HEADER_SIZE); + // Set the size to be the header size + compressedLength + compressed.setSize(CHUNK_HEADER_SIZE + compressedLength); + // Serialize the original length + IntegerPointable.setInteger(compressed.getByteArray(), 0, originalLength); + // Serialize the compressed length + IntegerPointable.setInteger(compressed.getByteArray(), Integer.BYTES, compressedLength); + + // Write chunks + VoidPointable chunk = new VoidPointable(); + int position = 0; + int chunkId = 0; + int keyLength = COLUMNS_METADATA_KEY.getLength() + Integer.BYTES; + int totalLength = compressed.getLength(); + while (position < totalLength) { + int remaining = totalLength - position; + int freeSpace = componentMetadata.getAvailableSpace() - keyLength; + // Find the largest chunk size that can be written + int chunkLength = Math.min(remaining, freeSpace); + // Prepare a chunk + chunk.set(compressed.getByteArray(), position, chunkLength); + // Write a chunk + componentMetadata.put(getChunkKey(chunkId++, key), chunk); + position += chunkLength; + } + } + + private void readChunks(IComponentMetadata componentMetadata, ArrayBackedValueStorage chunk) + throws HyracksDataException { + ArrayBackedValueStorage key = new ArrayBackedValueStorage(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES); + ArrayBackedValueStorage compressed = new ArrayBackedValueStorage(); + // Ensure large buffer to avoid enlarging the storage multiple times + chunk.setSize(componentMetadata.getPageSize()); + + int chunkId = 0; + // Read the header + the first chunk + chunk.reset(); + componentMetadata.get(getChunkKey(chunkId++, key), chunk); + int originalLength = IntegerPointable.getInteger(chunk.getByteArray(), 0); + int compressedLength = IntegerPointable.getInteger(chunk.getByteArray(), Integer.BYTES); + // Append the first chunk without the header + compressed.append(chunk.getByteArray(), CHUNK_HEADER_SIZE, chunk.getLength() - CHUNK_HEADER_SIZE); + // Read the remaining chunks + int remainingLength = compressedLength - compressed.getLength(); + while (remainingLength > 0) { + chunk.reset(); + // Get the next chunk + componentMetadata.get(getChunkKey(chunkId++, key), chunk); + // Append the next chunk + compressed.append(chunk); + remainingLength -= chunk.getLength(); + } + + // Decompress 'compressed' + int requiredSize = compressorDecompressor.computeCompressedBufferSize(originalLength); + // Ensure the size + chunk.setSize(requiredSize); + int uncompressedLength = compressorDecompressor.uncompress(compressed.getByteArray(), 0, compressedLength, + chunk.getByteArray(), 0); + if (uncompressedLength != originalLength) { + throw new IllegalStateException("Uncompressed size mismatch (original: " + originalLength + + ", uncompressed: " + uncompressedLength + ")"); + } + + // Set the original length + chunk.setSize(originalLength); + } + + private static IValueReference getChunkKey(int chunkId, ArrayBackedValueStorage storage) + throws HyracksDataException { + if (chunkId == 0) { + // First chunk. Append the key prefix + set the size + storage.reset(); + storage.append(COLUMNS_METADATA_KEY); + storage.setSize(COLUMNS_METADATA_KEY.getLength() + Integer.BYTES); + } + + IntegerPointable.setInteger(storage.getByteArray(), COLUMNS_METADATA_KEY.getLength(), chunkId); + return storage; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java index fc1e4603ba..95edced0a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/ColumnUtil.java @@ -23,36 +23,24 @@ import java.nio.ByteBuffer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; -import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; -import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnManager; import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector; -import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree; import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata; import org.apache.hyracks.storage.common.IIndexAccessParameters; public class ColumnUtil { - /** - * Used to get the columns info from {@link IComponentMetadata#get(IValueReference, ArrayBackedValueStorage)} - * - * @see LSMColumnBTree#activate() - * @see IColumnManager#activate(IValueReference) - */ - private static final MutableArrayValueReference COLUMNS_METADATA_KEY = - new MutableArrayValueReference("COLUMNS_METADATA".getBytes()); + // Currently, ColumnMetadataReaderWriter is thread safe as the snappy compressor/decompressor is thread safe + private static final ColumnMetadataReaderWriter READER_WRITER = new ColumnMetadataReaderWriter(); private ColumnUtil() { } public static IValueReference getColumnMetadataCopy(IComponentMetadata src) throws HyracksDataException { - ArrayBackedValueStorage storage = new ArrayBackedValueStorage(); - src.get(COLUMNS_METADATA_KEY, storage); - return storage; + return READER_WRITER.readMetadata(src); } public static void putColumnsMetadataValue(IValueReference columnsMetadataValue, IComponentMetadata dest) throws HyracksDataException { - dest.put(COLUMNS_METADATA_KEY, columnsMetadataValue); + READER_WRITER.writeMetadata(columnsMetadataValue, dest); } public static int getColumnPageIndex(int columnOffset, int pageSize) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java index fa69d7a6f7..8cdc064274 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IComponentMetadata.java @@ -23,23 +23,31 @@ import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public interface IComponentMetadata { + /** + * @return page size + */ + int getPageSize(); + + /** + * @return the available space to store a value + */ + int getAvailableSpace() throws HyracksDataException; /** * Put the key value pair in this metadata, overwrite if it exists * * @param key * @param value - * @throws HyracksDataException - * if the component is immutable + * @throws HyracksDataException if the component is immutable */ void put(IValueReference key, IValueReference value) throws HyracksDataException; /** * Get the value of the key from the metadata, 0 length value if not exists * - * @param key - * @param value - * @throws HyracksDataException + * @param key of the value + * @param storage storage used to store the retrieved value + * @return true if the key exists, false otherwise */ - void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException; + boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java index 649989cb08..bed72a541a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java @@ -32,14 +32,24 @@ public class DiskComponentMetadata implements IComponentMetadata { this.mdpManager = mdpManager; } + @Override + public int getPageSize() { + return mdpManager.getPageSize(); + } + + @Override + public int getAvailableSpace() throws HyracksDataException { + return mdpManager.getFreeSpace(); + } + @Override public void put(IValueReference key, IValueReference value) throws HyracksDataException { mdpManager.put(mdpManager.createMetadataFrame(), key, value); } @Override - public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - mdpManager.get(mdpManager.createMetadataFrame(), key, value); + public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException { + return mdpManager.get(mdpManager.createMetadataFrame(), key, storage); } public void put(MemoryComponentMetadata metadata) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java index d0fe8a9ea0..53146589bf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyDiskComponentMetadata.java @@ -35,7 +35,7 @@ public class EmptyDiskComponentMetadata extends DiskComponentMetadata { } @Override - public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { + public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException { throw new IllegalStateException("Attempt to read metadata of empty component"); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java index 0c2167ff6a..b90b37c214 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java @@ -38,6 +38,16 @@ public class MemoryComponentMetadata implements IComponentMetadata { private final List<org.apache.commons.lang3.tuple.Pair<IValueReference, ArrayBackedValueStorage>> store = new ArrayList<>(); + @Override + public int getPageSize() { + return -1; + } + + @Override + public int getAvailableSpace() throws HyracksDataException { + return Integer.MAX_VALUE; + } + /** * Note: for memory metadata, it is expected that the key will be constant * @@ -64,14 +74,18 @@ public class MemoryComponentMetadata implements IComponentMetadata { * @throws HyracksDataException */ @Override - public void get(IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { + public boolean get(IValueReference key, ArrayBackedValueStorage storage) throws HyracksDataException { lock.readLock().lock(); try { - value.reset(); + storage.reset(); ArrayBackedValueStorage stored = get(key); if (stored != null) { - value.append(stored); + storage.append(stored); + return true; } + + // Key does not exist + return false; } finally { lock.readLock().unlock(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java index c4855bd0a7..943b2b786c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java @@ -21,7 +21,6 @@ package org.apache.hyracks.storage.common.compression; import java.nio.ByteBuffer; import org.apache.hyracks.api.compression.ICompressorDecompressor; -import org.apache.hyracks.api.exceptions.HyracksDataException; public class NoOpCompressorDecompressor implements ICompressorDecompressor { public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor(); @@ -35,12 +34,24 @@ public class NoOpCompressorDecompressor implements ICompressorDecompressor { } @Override - public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException { + public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) { + System.arraycopy(src, srcOffset, dest, destOffset, srcLen); + return srcLen; + } + + @Override + public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) { return uBuffer; } @Override - public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException { + public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) { + System.arraycopy(src, srcOffset, dest, destOffset, srcLen); + return srcLen; + } + + @Override + public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) { return cBuffer; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java index 16c9a2d529..e3274addd7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java @@ -29,10 +29,9 @@ import org.xerial.snappy.Snappy; * Built-in Snappy compressor/decompressor wrapper */ public class SnappyCompressorDecompressor implements ICompressorDecompressor { - protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor(); + public static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor(); private SnappyCompressorDecompressor() { - } @Override @@ -40,6 +39,16 @@ public class SnappyCompressorDecompressor implements ICompressorDecompressor { return Snappy.maxCompressedLength(uncompressedBufferSize); } + @Override + public int compress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) + throws HyracksDataException { + try { + return Snappy.compress(src, srcOffset, srcLen, dest, destOffset); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + @Override public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException { try { @@ -52,6 +61,16 @@ public class SnappyCompressorDecompressor implements ICompressorDecompressor { } } + @Override + public int uncompress(byte[] src, int srcOffset, int srcLen, byte[] dest, int destOffset) + throws HyracksDataException { + try { + return Snappy.uncompress(src, srcOffset, srcLen, dest, destOffset); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + @Override public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException { try {
