http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java new file mode 100644 index 0000000..a913513 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCacheHeaderHelper.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +import static org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; + +public class BufferCacheHeaderHelper { + private static final int FRAME_MULTIPLIER_OFF = 0; + private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4 + + private final ByteBuffer[] array; + private final int pageSizeWithHeader; + private ByteBuffer buf; + + public BufferCacheHeaderHelper(int pageSize) { + this.pageSizeWithHeader = RESERVED_HEADER_BYTES + pageSize; + buf = ByteBuffer.allocate(pageSizeWithHeader); + array = new ByteBuffer[] { buf, null }; + } + + public ByteBuffer[] prepareWrite(CachedPage cPage) { + setPageInfo(cPage); + buf.position(0); + buf.limit(RESERVED_HEADER_BYTES); + array[1] = cPage.buffer; + return array; + } + + public ByteBuffer prepareWrite(CachedPage cPage, int requiredSize) { + ensureBufferCapacity(requiredSize); + setPageInfo(cPage); + buf.position(RESERVED_HEADER_BYTES); + buf.limit(buf.capacity()); + return buf; + } + + public ByteBuffer prepareRead(int size) { + buf.position(0); + buf.limit(size); + return buf; + } + + public ByteBuffer processHeader(CachedPage cPage) { + cPage.setFrameSizeMultiplier(buf.getInt(FRAME_MULTIPLIER_OFF)); + cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF)); + buf.position(RESERVED_HEADER_BYTES); + return buf; + } + + private void setPageInfo(CachedPage cPage) { + buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier()); + buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId()); + } + + /** + * {@link ICompressorDecompressor#compress(byte[], int, int, byte[], int)} may require additional + * space to do the compression. see {@link ICompressorDecompressor#computeCompressedBufferSize(int)}. + * + * @param compressor + * @param size + */ + private void ensureBufferCapacity(int size) { + final int requiredSize = size + RESERVED_HEADER_BYTES; + if (buf.capacity() < requiredSize) { + buf = ByteBuffer.allocate(requiredSize); + array[0] = buf; + } + buf.limit(buf.capacity()); + } +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java index 6ec12aa..cf0553c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/CachedPage.java @@ -45,6 +45,8 @@ public class CachedPage implements ICachedPageInternal { private IQueueInfo queueInfo; private int multiplier; private int extraBlockPageId; + private long compressedOffset; + private int compressedSize; // DEBUG private static final boolean DEBUG = false; private final StackTraceElement[] ctorStack; @@ -224,4 +226,23 @@ public class CachedPage implements ICachedPageInternal { LOGGER.error("An IO Failure took place but the failure callback is not set", e); } } + + public void setCompressedPageOffset(long offset) { + this.compressedOffset = offset; + } + + @Override + public long getCompressedPageOffset() { + return compressedOffset; + } + + @Override + public void setCompressedPageSize(int size) { + this.compressedSize = size; + } + + @Override + public int getCompressedPageSize() { + return compressedSize; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java index 21d3677..c762dd9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IBufferCache.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.common.buffercache; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.replication.IIOReplicationManager; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; public interface IBufferCache { @@ -278,4 +279,11 @@ public interface IBufferCache { */ void closeFileIfOpen(FileReference fileRef); + /** + * @return compressed page writer + */ + default ICompressedPageWriter getCompressedPageWriter(int fileId) { + throw new UnsupportedOperationException(this.getClass().getName() + " does not support compressed pages"); + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java index d900852..04e93db 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ICachedPageInternal.java @@ -35,4 +35,12 @@ public interface ICachedPageInternal extends ICachedPage { int getExtraBlockPageId(); void setExtraBlockPageId(int extraBlockPageId); + + void setCompressedPageOffset(long offset); + + long getCompressedPageOffset(); + + void setCompressedPageSize(int size); + + int getCompressedPageSize(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java new file mode 100644 index 0000000..c4855bd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class NoOpCompressorDecompressor implements ICompressorDecompressor { + public static final NoOpCompressorDecompressor INSTANCE = new NoOpCompressorDecompressor(); + + private NoOpCompressorDecompressor() { + } + + @Override + public int computeCompressedBufferSize(int uBufferSize) { + return 0; + } + + @Override + public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException { + return uBuffer; + } + + @Override + public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException { + return cBuffer; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java new file mode 100644 index 0000000..690f4a2 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/NoOpCompressorDecompressorFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IJsonSerializable; +import org.apache.hyracks.api.io.IPersistedResourceRegistry; + +import com.fasterxml.jackson.databind.JsonNode; + +public class NoOpCompressorDecompressorFactory implements ICompressorDecompressorFactory { + private static final long serialVersionUID = 1L; + public static final ICompressorDecompressorFactory INSTANCE = new NoOpCompressorDecompressorFactory(); + + @Override + public ICompressorDecompressor createInstance() { + return NoOpCompressorDecompressor.INSTANCE; + } + + @Override + public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException { + return registry.getClassIdentifier(getClass(), serialVersionUID); + } + + @SuppressWarnings("squid:S1172") // unused parameter + public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java new file mode 100644 index 0000000..16c9a2d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.xerial.snappy.Snappy; + +/** + * Built-in Snappy compressor/decompressor wrapper + */ +public class SnappyCompressorDecompressor implements ICompressorDecompressor { + protected static final SnappyCompressorDecompressor INSTANCE = new SnappyCompressorDecompressor(); + + private SnappyCompressorDecompressor() { + + } + + @Override + public int computeCompressedBufferSize(int uncompressedBufferSize) { + return Snappy.maxCompressedLength(uncompressedBufferSize); + } + + @Override + public ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException { + try { + final int cLength = Snappy.compress(uBuffer.array(), uBuffer.position(), uBuffer.remaining(), + cBuffer.array(), cBuffer.position()); + cBuffer.limit(cBuffer.position() + cLength); + return cBuffer; + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + + @Override + public ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException { + try { + final int uLength = Snappy.uncompress(cBuffer.array(), cBuffer.position(), cBuffer.remaining(), + uBuffer.array(), uBuffer.position()); + uBuffer.limit(uBuffer.position() + uLength); + return uBuffer; + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java new file mode 100644 index 0000000..93f31bd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/SnappyCompressorDecompressorFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IJsonSerializable; +import org.apache.hyracks.api.io.IPersistedResourceRegistry; + +import com.fasterxml.jackson.databind.JsonNode; + +public class SnappyCompressorDecompressorFactory implements ICompressorDecompressorFactory { + private static final long serialVersionUID = 1L; + private static final ICompressorDecompressorFactory INSTANCE = new SnappyCompressorDecompressorFactory(); + + @Override + public ICompressorDecompressor createInstance() { + return SnappyCompressorDecompressor.INSTANCE; + } + + @Override + public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException { + return registry.getClassIdentifier(getClass(), serialVersionUID); + } + + @SuppressWarnings("squid:S1172") // unused parameter + public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) { + return INSTANCE; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java new file mode 100644 index 0000000..70d1388 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileManager.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression.file; + +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; + +/** + * CompressedFileManager is responsible to manage the Look Aside File (LAF file), which contains + * the compression information. LAF file format is as follow: + * + * [<offset0, size0>, <offset1, size1> .... <offsetN, sizeN>] + * Each entry <offsetM, sizeM> is an entry of 16-bytes for page M (8 bytes for offset and 8 for size). + * + * The file is responsible to store the beginning and the size of each page after compression. + */ +public class CompressedFileManager { + protected static final int SIZE_ENTRY_OFFSET = 8; // 0 is for the compressed page offset + protected static final int ENTRY_LENGTH = 16; //<offset(8-bytes),size(8-bytes)> + protected static final int EOF = -1; + private static final EnumSet<State> CLOSED = EnumSet.of(State.CLOSED); + private static final EnumSet<State> READABLE_WRITABLE = EnumSet.of(State.READABLE, State.WRITABLE); + private static final EnumSet<State> READABLE = EnumSet.of(State.READABLE); + private static final EnumSet<State> WRITABLE = EnumSet.of(State.WRITABLE); + + private enum State { + READABLE, + WRITABLE, + CLOSED + } + + private final IBufferCache bufferCache; + private final int fileId; + private final ICompressorDecompressor compressorDecompressor; + + private State state; + private int totalNumOfPages; + + private LAFWriter lafWriter; + + public CompressedFileManager(IBufferCache bufferCache, int fileId, CompressedFileReference fileRef) { + state = State.CLOSED; + totalNumOfPages = 0; + this.bufferCache = bufferCache; + this.fileId = fileId; + this.compressorDecompressor = fileRef.getCompressorDecompressor(); + } + + /** + * If the file is empty (i.e. the number of pages is zero) + * Then the state will be WRITABLE. + * + * @throws HyracksDataException + */ + public void open() throws HyracksDataException { + ensureState(CLOSED); + changeToFunctionalState(); + } + + /** + * Close the LAF file. + * + * @throws HyracksDataException + */ + public void close() { + ensureState(READABLE_WRITABLE); + state = State.CLOSED; + } + + /* ************************ + * LAF writing methods + * ************************ + */ + + public ICompressedPageWriter getCompressedPageWriter() { + ensureState(WRITABLE); + return lafWriter; + } + + /** + * Add page information (offset, size) after compression. + * + * @param dpid + * @param size + * @return offset for the compressed page. + * @throws HyracksDataException + */ + public long writePageInfo(long dpid, long size) throws HyracksDataException { + final int pageId = BufferedFileHandle.getPageId(dpid); + //Write the page (extraPageIndex = 0) + return writeExtraPageInfo(pageId, size, 0); + } + + /** + * Add extra page information (offset, size) after compression. + * + * @param extraPageId + * extra page ID + * @param size + * size of the extra page + * @param extraPageIndex + * the index of the extra page (starting from 0) + * @return offset for the compressed page. + * @throws HyracksDataException + */ + public long writeExtraPageInfo(int extraPageId, long size, int extraPageIndex) throws HyracksDataException { + ensureState(WRITABLE); + + final long compressedPageOffset; + try { + compressedPageOffset = lafWriter.writePageInfo(extraPageId + extraPageIndex, size); + } catch (HyracksDataException e) { + lafWriter.abort(); + throw e; + } + + return compressedPageOffset; + } + + /** + * This methods is used by {@link LAFWriter#endWriting()} to signal the end of writing. + * After calling this methods, LAF file will be READ-ONLY. + * + * @param totalNumOfPages + * The total number of pages of the index + * @throws HyracksDataException + */ + void endWriting(int totalNumOfPages) { + ensureState(WRITABLE); + this.totalNumOfPages = totalNumOfPages; + lafWriter = null; + state = State.READABLE; + } + + /* ************************ + * LAF reading methods + * ************************ + */ + + /** + * Set the compressed page offset and size + * + * @param compressedPage + * @throws HyracksDataException + */ + public void setCompressedPageInfo(ICachedPageInternal compressedPage) throws HyracksDataException { + setCompressedPageInfo(BufferedFileHandle.getPageId(compressedPage.getDiskPageId()), compressedPage); + } + + /** + * Set the extra compressed page offset and size + * + * @param compressedPage + * @param extraPageIndex + * @throws HyracksDataException + */ + public void setExtraCompressedPageInfo(ICachedPageInternal compressedPage, int extraPageIndex) + throws HyracksDataException { + setCompressedPageInfo(compressedPage.getExtraBlockPageId() + extraPageIndex, compressedPage); + } + + /* ************************ + * LAF general methods + * ************************ + */ + + /** + * Get the number of compressed pages + * + * @return + */ + public int getNumberOfPages() { + return totalNumOfPages; + } + + public int getFileId() { + return fileId; + } + + public ICompressorDecompressor getCompressorDecompressor() { + return compressorDecompressor; + } + + /* ************************ + * Private methods + * ************************ + */ + + private void ensureState(EnumSet<State> expectedStates) { + if (!expectedStates.contains(state)) { + throw new IllegalStateException( + "Expecting the state to be " + expectedStates + ". Currently it is " + state); + } + } + + private void changeToFunctionalState() throws HyracksDataException { + if (bufferCache.getNumPagesOfFile(fileId) == 0) { + state = State.WRITABLE; + lafWriter = new LAFWriter(this, bufferCache); + } else { + state = State.READABLE; + init(); + } + } + + private void init() throws HyracksDataException { + final int numOfPages = bufferCache.getNumPagesOfFile(fileId); + //Maximum number of entries in a page + final int numOfEntriesPerPage = bufferCache.getPageSize() / ENTRY_LENGTH; + //get the last page which may contain less entries than maxNumOfEntries + final long dpid = getDiskPageId(numOfPages - 1); + final ICachedPage page = bufferCache.pin(dpid, false); + try { + final ByteBuffer buf = page.getBuffer(); + + //Start at 1 since it is impossible to have EOF at the first entry of a page + int i = 1; + //Seek EOF and count number of entries + while (i < numOfEntriesPerPage && buf.getLong(i * ENTRY_LENGTH) != EOF) { + i++; + } + + totalNumOfPages = (numOfPages - 1) * numOfEntriesPerPage + i; + } finally { + bufferCache.unpin(page); + } + } + + private ICachedPage pinAndGetPage(int compressedPageId) throws HyracksDataException { + final int pageId = compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize(); + return bufferCache.pin(getDiskPageId(pageId), false); + } + + private long getDiskPageId(int pageId) { + return BufferedFileHandle.getDiskPageId(fileId, pageId); + } + + private void setCompressedPageInfo(int compressedPageId, ICachedPageInternal compressedPage) + throws HyracksDataException { + ensureState(READABLE); + if (totalNumOfPages == 0) { + /* + * It seems it is legal to pin empty file. + * Return the page information as it is not compressed. + */ + compressedPage.setCompressedPageOffset(0); + compressedPage.setCompressedPageSize(bufferCache.getPageSize()); + return; + } + final ICachedPage page = pinAndGetPage(compressedPageId); + try { + // No need for read latches as pages are immutable. + final ByteBuffer buf = page.getBuffer(); + final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize(); + compressedPage.setCompressedPageOffset(buf.getLong(entryOffset)); + compressedPage.setCompressedPageSize((int) buf.getLong(entryOffset + SIZE_ENTRY_OFFSET)); + } finally { + bufferCache.unpin(page); + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java new file mode 100644 index 0000000..3fb682c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/CompressedFileReference.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression.file; + +import java.util.Objects; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IODeviceHandle; + +public class CompressedFileReference extends FileReference { + private static final long serialVersionUID = 1L; + + private final String lafPath; + private final FileReference lafFileRef; + private final transient ICompressorDecompressor compressorDecompressor; + + public CompressedFileReference(IODeviceHandle dev, ICompressorDecompressor compressorDecompressor, String path, + String lafPath) { + super(dev, path); + this.lafPath = lafPath; + lafFileRef = new FileReference(dev, lafPath); + this.compressorDecompressor = compressorDecompressor; + } + + public FileReference getLAFFileReference() { + return lafFileRef; + } + + public ICompressorDecompressor getCompressorDecompressor() { + return compressorDecompressor; + } + + @Override + public boolean delete() { + return lafFileRef.delete() && super.delete(); + } + + @Override + public boolean isCompressed() { + return true; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CompressedFileReference)) { + return false; + } + return super.equals(o) && lafPath.equals(((CompressedFileReference) o).lafPath); + } + + @Override + public int hashCode() { + return Objects.hash(super.getRelativePath(), lafPath); + } + + /** + * @return the relative path for LAF file + */ + public String getLAFRelativePath() { + return lafPath; + } + + /** + * @return the absolute path for LAF file + */ + public String getLAFAbsolutePath() { + return lafFileRef.getAbsolutePath(); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java new file mode 100644 index 0000000..86525b0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/ICompressedPageWriter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression.file; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; + +/** + * An interface that exposes the Look Aside File (LAF) writer to the indexes. + */ +public interface ICompressedPageWriter { + /** + * Before the index can write a compressed page, the index has to prepare the writer. + * + * @param cPage + * @throws HyracksDataException + */ + public void prepareWrite(ICachedPage cPage) throws HyracksDataException; + + /** + * Signal the writer to abort. + */ + public void abort(); + + /** + * Finalize the writing of the compressed pages. + * + * @return + * @throws HyracksDataException + */ + void endWriting() throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java new file mode 100644 index 0000000..dcccc52 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/LAFWriter.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression.file; + +import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.ENTRY_LENGTH; +import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.EOF; +import static org.apache.hyracks.storage.common.compression.file.CompressedFileManager.SIZE_ENTRY_OFFSET; + +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal; +import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback; +import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.util.annotations.NotThreadSafe; + +/** + * Look Aside File writer + * This class is called by two threads simultaneously: + * - a thread to prepare the LAF page (bulk-loader) + * - and a writer thread to write the LAF page (buffer cache writer thread) + * Hence, it is not thread safe to have more than one thread to prepare or to write LAF pages. + */ +@NotThreadSafe +class LAFWriter implements ICompressedPageWriter { + private final CompressedFileManager compressedFileManager; + private final IBufferCache bufferCache; + private final IFIFOPageQueue queue; + private final Map<Integer, LAFFrame> cachedFrames; + private final Queue<LAFFrame> recycledFrames; + private final int fileId; + private final int maxNumOfEntries; + private final PageWriteFailureCallback callBack; + private LAFFrame currentFrame; + private int currentPageId; + private int maxPageId; + + private long lastOffset; + private int totalNumOfPages; + + public LAFWriter(CompressedFileManager compressedFileManager, IBufferCache bufferCache) { + this.compressedFileManager = compressedFileManager; + this.bufferCache = bufferCache; + queue = bufferCache.createFIFOQueue(); + cachedFrames = new HashMap<>(); + recycledFrames = new ArrayDeque<>(); + this.fileId = compressedFileManager.getFileId(); + callBack = new PageWriteFailureCallback(); + + maxNumOfEntries = bufferCache.getPageSize() / ENTRY_LENGTH; + lastOffset = 0; + totalNumOfPages = 0; + maxPageId = -1; + currentPageId = -1; + } + + /* ************************************ + * ICompressedPageWriter methods + * Called by non-BufferCache thread (Bulk-loader) + * ************************************ + */ + + @Override + public void prepareWrite(ICachedPage cPage) throws HyracksDataException { + final ICachedPageInternal internalPage = (ICachedPageInternal) cPage; + final int entryPageId = getLAFEntryPageId(BufferedFileHandle.getPageId(internalPage.getDiskPageId())); + + synchronized (cachedFrames) { + if (!cachedFrames.containsKey(entryPageId)) { + try { + //Writing new page(s). Confiscate the page(s) from the buffer cache. + prepareFrames(entryPageId, internalPage); + } catch (HyracksDataException e) { + abort(); + throw e; + } + } + } + } + + private void prepareFrames(int entryPageId, ICachedPageInternal cPage) throws HyracksDataException { + //Confiscate the first page + confiscatePage(entryPageId); + //check if extra pages spans to the next entry page + for (int i = 0; i < cPage.getFrameSizeMultiplier() - 1; i++) { + final int extraEntryPageId = getLAFEntryPageId(cPage.getExtraBlockPageId() + i); + if (!cachedFrames.containsKey(extraEntryPageId)) { + confiscatePage(extraEntryPageId); + } + } + } + + private void confiscatePage(int pageId) throws HyracksDataException { + //Writing new page. Confiscate the page from the buffer cache. + final ICachedPage newPage = bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, pageId)); + cachedFrames.put(pageId, getLAFFrame(newPage)); + maxPageId = Math.max(maxPageId, pageId); + } + + private LAFFrame getLAFFrame(ICachedPage cPage) { + LAFFrame lafFrame = recycledFrames.poll(); + if (lafFrame == null) { + lafFrame = new LAFFrame(); + } + lafFrame.setCachedPage(cPage); + return lafFrame; + } + + @Override + public void endWriting() throws HyracksDataException { + if (callBack.hasFailed()) { + //if write failed, return confiscated pages + abort(); + throw HyracksDataException.create(callBack.getFailure()); + } + synchronized (cachedFrames) { + final LAFFrame lastPage = cachedFrames.get(maxPageId); + if (lastPage != null && !lastPage.isFull()) { + /* + * The last page may or may not be filled. In case it is not filled (i.e do not have + * the max number of entries). Then, write an indicator after the last entry. + * If it has been written (i.e lastPage = null), that means it has been filled. + */ + lastPage.setEOF(); + } + for (Entry<Integer, LAFFrame> entry : cachedFrames.entrySet()) { + queue.put(entry.getValue().cPage, callBack); + } + bufferCache.finishQueue(); + + //Signal the compressedFileManager to change its state + compressedFileManager.endWriting(totalNumOfPages); + } + } + + @Override + public void abort() { + synchronized (cachedFrames) { + for (Entry<Integer, LAFFrame> frame : cachedFrames.entrySet()) { + bufferCache.returnPage(frame.getValue().cPage); + } + } + } + + /* ************************************ + * Local methods: + * Called by BufferCache writer thread + * ************************************ + */ + + public long writePageInfo(int pageId, long size) throws HyracksDataException { + final LAFFrame frame = getPageBuffer(pageId); + + final long pageOffset = lastOffset; + frame.writePageInfo(pageId, pageOffset, size); + lastOffset += size; + totalNumOfPages++; + + writeFullPage(); + return pageOffset; + } + + private LAFFrame getPageBuffer(int compressedPageId) { + final int pageId = getLAFEntryPageId(compressedPageId); + + if (currentPageId == pageId) { + return currentFrame; + } + + final LAFFrame frame; + synchronized (cachedFrames) { + //Check if the frame is cached + frame = cachedFrames.get(pageId); + if (frame == null) { + //Trying to write unprepared page + abort(); + throw new IllegalStateException("Unprepared compressed-write for page ID: " + pageId); + } + } + + currentFrame = frame; + currentPageId = pageId; + return frame; + } + + private void writeFullPage() throws HyracksDataException { + if (currentFrame.isFull()) { + //The LAF page is filled. We do not need to keep it. + //Write it to the file and remove it from the cachedFrames map + queue.put(currentFrame.cPage, callBack); + synchronized (cachedFrames) { + //Recycle the frame + final LAFFrame frame = cachedFrames.remove(currentPageId); + frame.setCachedPage(null); + recycledFrames.add(frame); + } + currentFrame = null; + currentPageId = -1; + } + } + + private int getLAFEntryPageId(int compressedPageId) { + return compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize(); + } + + private class LAFFrame { + private ICachedPage cPage; + private int numOfEntries; + private int maxEntryOffset; + + public void setCachedPage(ICachedPage cPage) { + this.cPage = cPage; + numOfEntries = 0; + maxEntryOffset = -1; + } + + public void writePageInfo(int compressedPageId, long offset, long size) { + final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize(); + //Put page offset + cPage.getBuffer().putLong(entryOffset, offset); + //Put page size + cPage.getBuffer().putLong(entryOffset + SIZE_ENTRY_OFFSET, size); + //Keep the max entry offset to set EOF (if needed) + maxEntryOffset = Math.max(maxEntryOffset, entryOffset); + numOfEntries++; + } + + public void setEOF() { + cPage.getBuffer().putLong(maxEntryOffset + ENTRY_LENGTH, EOF); + } + + public boolean isFull() { + return numOfEntries == maxNumOfEntries; + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java new file mode 100644 index 0000000..8f957a9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/compression/file/NoOpLAFWriter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.compression.file; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; + +public class NoOpLAFWriter implements ICompressedPageWriter { + public static final NoOpLAFWriter INSTACNE = new NoOpLAFWriter(); + + private NoOpLAFWriter() { + } + + @Override + public void prepareWrite(ICachedPage cPage) throws HyracksDataException { + //NoOp + } + + @Override + public void abort() { + //NoOp + } + + @Override + public void endWriting() throws HyracksDataException { + //NoOp + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java index 4f15588..11862dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/BufferedFileHandle.java @@ -18,18 +18,32 @@ */ package org.apache.hyracks.storage.common.file; -import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hyracks.storage.common.buffercache.BufferCache.DEBUG; -import org.apache.hyracks.api.io.IFileHandle; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; -public class BufferedFileHandle { +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.storage.common.buffercache.AbstractBufferedFileIOManager; +import org.apache.hyracks.storage.common.buffercache.BufferCache; +import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper; +import org.apache.hyracks.storage.common.buffercache.CachedPage; +import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; +import org.apache.hyracks.storage.common.compression.file.CompressedFileReference; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; +import org.apache.hyracks.storage.common.compression.file.NoOpLAFWriter; + +public class BufferedFileHandle extends AbstractBufferedFileIOManager { private final int fileId; - private volatile IFileHandle handle; private final AtomicInteger refCount; - public BufferedFileHandle(int fileId, IFileHandle handle) { + protected BufferedFileHandle(int fileId, BufferCache bufferCache, IIOManager ioManager, + BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) { + super(bufferCache, ioManager, headerPageCache, pageReplacementStrategy); this.fileId = fileId; - this.handle = handle; refCount = new AtomicInteger(); } @@ -37,22 +51,6 @@ public class BufferedFileHandle { return fileId; } - public void setFileHandle(IFileHandle fileHandle) { - this.handle = fileHandle; - } - - public IFileHandle getFileHandle() { - return handle; - } - - public void markAsDeleted() { - handle = null; - } - - public boolean fileHasBeenDeleted() { - return handle == null; - } - public int incReferenceCount() { return refCount.incrementAndGet(); } @@ -69,6 +67,86 @@ public class BufferedFileHandle { return getDiskPageId(fileId, pageId); } + @Override + public void read(CachedPage cPage) throws HyracksDataException { + final BufferCacheHeaderHelper header = checkoutHeaderHelper(); + try { + long bytesRead = + readToBuffer(header.prepareRead(bufferCache.getPageSizeWithHeader()), getFirstPageOffset(cPage)); + + if (!verifyBytesRead(bufferCache.getPageSizeWithHeader(), bytesRead)) { + return; + } + + final ByteBuffer buf = header.processHeader(cPage); + cPage.getBuffer().put(buf); + } finally { + returnHeaderHelper(header); + } + + readExtraPages(cPage); + } + + private void readExtraPages(CachedPage cPage) throws HyracksDataException { + final int totalPages = cPage.getFrameSizeMultiplier(); + if (totalPages > 1) { + pageReplacementStrategy.fixupCapacityOnLargeRead(cPage); + cPage.getBuffer().position(bufferCache.getPageSize()); + cPage.getBuffer().limit(totalPages * bufferCache.getPageSize()); + readToBuffer(cPage.getBuffer(), getExtraPageOffset(cPage)); + } + } + + @Override + protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId) + throws HyracksDataException { + final ByteBuffer buf = cPage.getBuffer(); + final boolean contiguousLargePages = getPageId(cPage.getDiskPageId()) + 1 == extraBlockPageId; + long bytesWritten; + try { + buf.limit(contiguousLargePages ? bufferCache.getPageSize() * totalPages : bufferCache.getPageSize()); + buf.position(0); + bytesWritten = writeToFile(header.prepareWrite(cPage), getFirstPageOffset(cPage)); + } finally { + returnHeaderHelper(header); + } + + if (totalPages > 1 && !contiguousLargePages) { + buf.limit(totalPages * bufferCache.getPageSize()); + bytesWritten += writeToFile(buf, getExtraPageOffset(cPage)); + } + + final int expectedWritten = bufferCache.getPageSizeWithHeader() + bufferCache.getPageSize() * (totalPages - 1); + verifyBytesWritten(expectedWritten, bytesWritten); + } + + @Override + public int getNumberOfPages() { + if (DEBUG) { + assert getFileSize() % bufferCache.getPageSizeWithHeader() == 0; + } + return (int) (getFileSize() / bufferCache.getPageSizeWithHeader()); + } + + @Override + public ICompressedPageWriter getCompressedPageWriter() { + return NoOpLAFWriter.INSTACNE; + } + + @Override + protected long getFirstPageOffset(CachedPage cPage) { + return getPageOffset(getPageId(cPage.getDiskPageId())); + } + + @Override + protected long getExtraPageOffset(CachedPage cPage) { + return getPageOffset(cPage.getExtraBlockPageId()); + } + + private long getPageOffset(long pageId) { + return pageId * bufferCache.getPageSizeWithHeader(); + } + public static long getDiskPageId(int fileId, int pageId) { return (((long) fileId) << 32) + pageId; } @@ -80,4 +158,15 @@ public class BufferedFileHandle { public static int getPageId(long dpid) { return (int) dpid; } + + public static BufferedFileHandle create(FileReference fileRef, int fileId, BufferCache bufferCache, + IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache, + IPageReplacementStrategy pageReplacementStrategy) { + if (fileRef.isCompressed()) { + final CompressedFileReference cFileRef = (CompressedFileReference) fileRef; + return new CompressedBufferedFileHandle(fileId, cFileRef.getLAFFileReference(), bufferCache, ioManager, + headerPageCache, pageReplacementStrategy); + } + return new BufferedFileHandle(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java new file mode 100644 index 0000000..235e144 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.file; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.storage.common.buffercache.BufferCache; +import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper; +import org.apache.hyracks.storage.common.buffercache.CachedPage; +import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; +import org.apache.hyracks.storage.common.compression.file.CompressedFileManager; +import org.apache.hyracks.storage.common.compression.file.CompressedFileReference; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; + +public class CompressedBufferedFileHandle extends BufferedFileHandle { + private final FileReference lafFileRef; + private volatile CompressedFileManager compressedFileManager; + + protected CompressedBufferedFileHandle(int fileId, FileReference lafFileRef, BufferCache bufferCache, + IIOManager ioManager, BlockingQueue<BufferCacheHeaderHelper> headerPageCache, + IPageReplacementStrategy pageReplacementStrategy) { + super(fileId, bufferCache, ioManager, headerPageCache, pageReplacementStrategy); + this.lafFileRef = lafFileRef; + } + + @Override + public void read(CachedPage cPage) throws HyracksDataException { + final BufferCacheHeaderHelper header = checkoutHeaderHelper(); + try { + compressedFileManager.setCompressedPageInfo(cPage); + long bytesRead = readToBuffer(header.prepareRead(cPage.getCompressedPageSize()), getFirstPageOffset(cPage)); + + if (!verifyBytesRead(cPage.getCompressedPageSize(), bytesRead)) { + return; + } + final ByteBuffer cBuffer = header.processHeader(cPage); + final ByteBuffer uBuffer = cPage.getBuffer(); + fixBufferPointers(uBuffer, 0); + if (cPage.getCompressedPageSize() < bufferCache.getPageSizeWithHeader()) { + uncompressToPageBuffer(cBuffer, uBuffer); + } else { + cPage.getBuffer().put(cBuffer); + } + + final int totalPages = cPage.getFrameSizeMultiplier(); + if (totalPages > 1) { + pageReplacementStrategy.fixupCapacityOnLargeRead(cPage); + readExtraPages(cPage, cBuffer); + } + } finally { + returnHeaderHelper(header); + } + } + + private void readExtraPages(CachedPage cPage, ByteBuffer cBuffer) throws HyracksDataException { + final ByteBuffer uBuffer = cPage.getBuffer(); + + final int totalPages = cPage.getFrameSizeMultiplier(); + for (int i = 1; i < totalPages; i++) { + fixBufferPointers(uBuffer, i); + compressedFileManager.setExtraCompressedPageInfo(cPage, i - 1); + if (cPage.getCompressedPageSize() < bufferCache.getPageSize()) { + cBuffer.position(0); + cBuffer.limit(cPage.getCompressedPageSize()); + readToBuffer(cBuffer, getExtraPageOffset(cPage)); + cBuffer.flip(); + uncompressToPageBuffer(cBuffer, cPage.getBuffer()); + } else { + readToBuffer(uBuffer, getExtraPageOffset(cPage)); + } + } + } + + @Override + protected void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, int extraBlockPageId) + throws HyracksDataException { + try { + final ByteBuffer cBuffer = header.prepareWrite(cPage, getRequiredBufferSize()); + final ByteBuffer uBuffer = cPage.getBuffer(); + final long pageId = cPage.getDiskPageId(); + + final long bytesWritten; + final long expectedBytesWritten; + + fixBufferPointers(uBuffer, 0); + if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) { + cBuffer.position(0); + final long offset = compressedFileManager.writePageInfo(pageId, cBuffer.remaining()); + expectedBytesWritten = cBuffer.limit(); + bytesWritten = writeToFile(cBuffer, offset); + } else { + //Compression did not gain any savings + final ByteBuffer[] buffers = header.prepareWrite(cPage); + final long offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader()); + expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit(); + bytesWritten = writeToFile(buffers, offset); + } + + verifyBytesWritten(expectedBytesWritten, bytesWritten); + + //Write extra pages + if (totalPages > 1) { + writeExtraCompressedPages(cPage, cBuffer, totalPages, extraBlockPageId); + } + + } finally { + returnHeaderHelper(header); + } + } + + private void writeExtraCompressedPages(CachedPage cPage, ByteBuffer cBuffer, int totalPages, int extraBlockPageId) + throws HyracksDataException { + final ByteBuffer uBuffer = cPage.getBuffer(); + long expectedBytesWritten = 0; + long bytesWritten = 0; + for (int i = 1; i < totalPages; i++) { + fixBufferPointers(uBuffer, i); + cBuffer.position(0); + + final ByteBuffer writeBuffer; + if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) { + writeBuffer = cBuffer; + } else { + writeBuffer = uBuffer; + } + final int length = writeBuffer.remaining(); + final long offset = compressedFileManager.writeExtraPageInfo(extraBlockPageId, length, i - 1); + expectedBytesWritten += length; + bytesWritten += writeToFile(writeBuffer, offset); + } + + verifyBytesWritten(expectedBytesWritten, bytesWritten); + + } + + @Override + public void open(FileReference fileRef) throws HyracksDataException { + final CompressedFileReference cFileRef = (CompressedFileReference) fileRef; + final int lafFileId = bufferCache.openFile(cFileRef.getLAFFileReference()); + + compressedFileManager = new CompressedFileManager(bufferCache, lafFileId, cFileRef); + compressedFileManager.open(); + super.open(fileRef); + } + + /** + * Decrement the reference counter for LAF file. + * It is up to {@link BufferCache} to physically close the file. + * see {@link BufferCache#deleteFile(FileReference)} and {@link BufferCache#purgeHandle(int)} + */ + @Override + public void close() throws HyracksDataException { + if (hasBeenOpened()) { + compressedFileManager.close(); + bufferCache.closeFile(compressedFileManager.getFileId()); + } + super.close(); + } + + @Override + public void purge() throws HyracksDataException { + super.purge(); + compressedFileManager.close(); + bufferCache.closeFile(compressedFileManager.getFileId()); + bufferCache.purgeHandle(compressedFileManager.getFileId()); + } + + @Override + public void markAsDeleted() throws HyracksDataException { + if (hasBeenOpened()) { + bufferCache.deleteFile(compressedFileManager.getFileId()); + compressedFileManager = null; + } else { + bufferCache.deleteFile(lafFileRef); + } + super.markAsDeleted(); + } + + @Override + public void force(boolean metadata) throws HyracksDataException { + super.force(metadata); + bufferCache.force(compressedFileManager.getFileId(), metadata); + } + + @Override + public int getNumberOfPages() { + return compressedFileManager.getNumberOfPages(); + } + + @Override + protected long getFirstPageOffset(CachedPage cPage) { + return cPage.getCompressedPageOffset(); + } + + @Override + protected long getExtraPageOffset(CachedPage cPage) { + return getFirstPageOffset(cPage); + } + + @Override + public ICompressedPageWriter getCompressedPageWriter() { + return compressedFileManager.getCompressedPageWriter(); + } + + /* ******************************** + * Compression methods + * ******************************** + */ + + private void fixBufferPointers(ByteBuffer uBuffer, int i) { + //Fix the uncompressed buffer to point at the i^th extra page + uBuffer.position(bufferCache.getPageSize() * i); + //Similarly, fix the limit to a page-worth of data from the i^th page + uBuffer.limit(uBuffer.position() + bufferCache.getPageSize()); + } + + private void uncompressToPageBuffer(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException { + final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor(); + compDecomp.uncompress(cBuffer, uBuffer); + verifyUncompressionSize(bufferCache.getPageSize(), uBuffer.remaining()); + } + + private int compressToWriteBuffer(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException { + final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor(); + compDecomp.compress(uBuffer, cBuffer); + return cBuffer.remaining(); + } + + private int getRequiredBufferSize() { + final ICompressorDecompressor compDecomp = compressedFileManager.getCompressorDecompressor(); + return compDecomp.computeCompressedBufferSize(bufferCache.getPageSize()); + } + + private void verifyUncompressionSize(int expected, int actual) { + if (expected != actual) { + throwException("Uncompressed", expected, actual); + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java index aee7e90..b8727b0 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeExamplesTest.java @@ -37,6 +37,7 @@ import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness; import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil; import org.apache.hyracks.storage.common.IIndexAccessor; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.util.trace.ITracer; import org.junit.After; import org.junit.Before; @@ -62,7 +63,8 @@ public class LSMBTreeExamplesTest extends OrderedIndexExamplesTest { bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, true, - harness.getMetadataPageManagerFactory(), false, ITracer.NONE); + harness.getMetadataPageManagerFactory(), false, ITracer.NONE, + NoOpCompressorDecompressorFactory.INSTANCE); } @Before http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java index 4fafb38..d83f475 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeModificationOperationCallbackTest.java @@ -31,6 +31,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.util.trace.ITracer; import org.junit.Test; @@ -52,7 +53,8 @@ public class LSMBTreeModificationOperationCallbackTest extends AbstractModificat harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, - harness.getMetadataPageManagerFactory(), false, ITracer.NONE); + harness.getMetadataPageManagerFactory(), false, ITracer.NONE, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java index 3dfb369..59c9ebb 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java @@ -40,6 +40,7 @@ import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursor; import org.apache.hyracks.storage.common.ISearchOperationCallback; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.util.trace.ITracer; import org.junit.Assert; import org.junit.Test; @@ -61,7 +62,8 @@ public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperation harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, - harness.getMetadataPageManagerFactory(), false, ITracer.NONE); + harness.getMetadataPageManagerFactory(), false, ITracer.NONE, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index 0914541..c14474b 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -41,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerFactory; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IModificationOperationCallback; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.util.trace.ITracer; import org.junit.After; import org.junit.Assert; @@ -74,7 +75,8 @@ public class LSMBTreeUpdateInPlaceTest extends AbstractOperationCallbackTest { harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), NoOpOperationTrackerFactory.INSTANCE.getOperationTracker(null, null), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, null, null, null, null, true, - harness.getMetadataPageManagerFactory(), true, ITracer.NONE); + harness.getMetadataPageManagerFactory(), true, ITracer.NONE, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java index b25a229..baa95e9 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java @@ -39,6 +39,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -57,12 +58,13 @@ public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource { super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, - vbcProvider, ioSchedulerProvider, durable); + vbcProvider, ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE); } protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException { - super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields); + super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java index 6b13f56..3d7c520 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java @@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFactory { private static final long serialVersionUID = 1L; @@ -47,7 +48,7 @@ public class TestLsmBtreeLocalResourceFactory extends LSMBTreeLocalResourceFacto super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate, - isPrimary, btreeFields); + isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java index 6950f86..85038c7 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/multithread/LSMBTreeMultiThreadTest.java @@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.datagen.ProbabilityHelper; import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness; import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.util.trace.ITracer; public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest { @@ -57,7 +58,8 @@ public class LSMBTreeMultiThreadTest extends OrderedIndexMultiThreadTest { harness.getFileReference(), harness.getDiskBufferCache(), typeTraits, cmpFactories, bloomFilterKeyFields, harness.getBoomFilterFalsePositiveRate(), harness.getMergePolicy(), harness.getOperationTracker(), harness.getIOScheduler(), harness.getIOOperationCallbackFactory(), true, - null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE); + null, null, null, null, true, harness.getMetadataPageManagerFactory(), false, ITracer.NONE, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java index 2462c85..3bb1fa4 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/perf/LSMTreeRunner.java @@ -36,9 +36,9 @@ import org.apache.hyracks.storage.am.common.datagen.TupleBatch; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree; import org.apache.hyracks.storage.am.lsm.btree.utils.LSMBTreeUtil; +import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler; import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicy; @@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.test.support.TestStorageManagerComponentHolder; import org.apache.hyracks.test.support.TestUtils; import org.apache.hyracks.util.ExitUtil; @@ -125,7 +126,7 @@ public class LSMTreeRunner implements IExperimentRunner { cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, new NoMergePolicy(), new ThreadCountingTracker(), ioScheduler, NoOpIOOperationCallbackFactory.INSTANCE, true, null, null, null, null, true, TestStorageManagerComponentHolder.getMetadataPageManagerFactory(), false, - ITracer.NONE); + ITracer.NONE, NoOpCompressorDecompressorFactory.INSTANCE); } @Override
