http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java new file mode 100644 index 0000000..3a7e901 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.compression; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * An API for block compressor/decompressor. + * + * Note: Should never allocate any buffer in compress/uncompress operations and it must be stateless to be thread safe. + */ +public interface ICompressorDecompressor { + /** + * Computes the required buffer size for <i>compress()</i>. + * + * @param uBufferSize + * The size of the uncompressed buffer. + * @return The required buffer size for compression + */ + int computeCompressedBufferSize(int uBufferSize); + + /** + * Compress <i>uBuffer</i> into <i>cBuffer</i> + * + * @param uBuffer + * Uncompressed source buffer + * @param cBuffer + * Compressed destination buffer + * @return Buffer after compression. ({@link ByteBuffer#limit()} is set to the compressed size + * @throws HyracksDataException + */ + ByteBuffer compress(ByteBuffer uBuffer, ByteBuffer cBuffer) throws HyracksDataException; + + /** + * Uncompress <i>cBuffer</i> into <i>uBuffer</i> + * + * @param cBuffer + * Compressed source buffer + * @param uBuffer + * Uncompressed destination buffer + * @return Buffer after decompression. ({@link ByteBuffer#limit()} is set to the uncompressed size + * @throws HyracksDataException + * An exception will be thrown if the <i>uBuffer</i> size is not sufficient. + */ + ByteBuffer uncompress(ByteBuffer cBuffer, ByteBuffer uBuffer) throws HyracksDataException; +}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java new file mode 100644 index 0000000..b813afb --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/compression/ICompressorDecompressorFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.compression; + +import java.io.Serializable; + +import org.apache.hyracks.api.io.IJsonSerializable; + +/** + * {@link ICompressorDecompressor} factory. + * + * New factory of this interface must implement two methods as well if the compression is intended for storage: + * - {@link IJsonSerializable#toJson(org.apache.hyracks.api.io.IPersistedResourceRegistry)} + * - a static method fromJson(IPersistedResourceRegistry registry, JsonNode json) + */ +public interface ICompressorDecompressorFactory extends Serializable, IJsonSerializable { + /** + * Create a compressor/decompressor instance + * + * @return {@code ICompressorDecompressor} + */ + ICompressorDecompressor createInstance(); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java index 4ded855..8b93d07 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/FileReference.java @@ -27,7 +27,7 @@ import java.util.Date; * Used to identify a file in the local Node Controller. * Only used for files which are stored inside an IO device. */ -public final class FileReference implements Serializable { +public class FileReference implements Serializable { private static final long serialVersionUID = 1L; private final File file; private final IODeviceHandle dev; @@ -90,7 +90,11 @@ public final class FileReference implements Serializable { } public FileReference getChild(String name) { - return new FileReference(dev, path + File.separator + name); + return new FileReference(dev, getChildPath(name)); + } + + public String getChildPath(String name) { + return path + File.separator + name; } public void register() { @@ -111,4 +115,8 @@ public final class FileReference implements Serializable { } registrationTime = 0; } + + public boolean isCompressed() { + return false; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java index 38162c6..333b373 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IPersistedResourceRegistry.java @@ -46,4 +46,16 @@ public interface IPersistedResourceRegistry { * @throws HyracksDataException */ IJsonSerializable deserialize(JsonNode json) throws HyracksDataException; + + /** + * This method must be used for optional fields or newly added fields to ensure back-compatibility + * + * @param json + * @param clazz + * @return A class object of the type id in {@code json} if exists + * or a class object of type <code>clazz</code> otherwise. + * @throws HyracksDataException + */ + IJsonSerializable deserializeOrDefault(JsonNode json, Class<? extends IJsonSerializable> clazz) + throws HyracksDataException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java index 91acea0..b8cf066 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java @@ -29,6 +29,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.SynchronousSchedulerProvid import org.apache.hyracks.storage.am.lsm.common.impls.ThreadCountingOperationTrackerFactory; import org.apache.hyracks.storage.common.IResourceFactory; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import org.apache.hyracks.tests.am.common.LSMTreeOperatorTestHelper; public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper { @@ -46,6 +47,6 @@ public class LSMBTreeOperatorTestHelper extends LSMTreeOperatorTestHelper { NoOpIOOperationCallbackFactory.INSTANCE, pageManagerFactory, getVirtualBufferCacheProvider(), SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY, MERGE_POLICY_PROPERTIES, DURABLE, bloomFilterKeyFields, LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true, - btreefields); + btreefields, NoOpCompressorDecompressorFactory.INSTANCE); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java index 4fc8af9..5d2cd26 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java @@ -1043,9 +1043,9 @@ public class BTree extends AbstractTreeIndex { ((IBTreeLeafFrame) leafFrame).setNextLeaf(leafFrontier.pageId); - queue.put(leafFrontier.page, this); + putInQueue(leafFrontier.page); for (ICachedPage c : pagesToWrite) { - queue.put(c, this); + putInQueue(c); } pagesToWrite.clear(); splitKey.setRightPage(leafFrontier.pageId); @@ -1152,7 +1152,7 @@ public class BTree extends AbstractTreeIndex { ICachedPage lastLeaf = nodeFrontiers.get(level).page; int lastLeafPage = nodeFrontiers.get(level).pageId; lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), nodeFrontiers.get(level).pageId)); - queue.put(lastLeaf, this); + putInQueue(lastLeaf); nodeFrontiers.get(level).page = null; persistFrontiers(level + 1, lastLeafPage); return; @@ -1167,7 +1167,7 @@ public class BTree extends AbstractTreeIndex { ((IBTreeInteriorFrame) interiorFrame).setRightmostChildPageId(rightPage); int finalPageId = freePageManager.takePage(metaFrame); frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(), finalPageId)); - queue.put(frontier.page, this); + putInQueue(frontier.page); frontier.pageId = finalPageId; persistFrontiers(level + 1, finalPageId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java index 97e7ed7..7d43ed8 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java @@ -33,6 +33,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager { @@ -221,10 +222,13 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager } int finalMetaPage = getMaxPageId(metaFrame) + 1; confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId, finalMetaPage)); + final ICompressedPageWriter compressedPageWriter = bufferCache.getCompressedPageWriter(fileId); + compressedPageWriter.prepareWrite(confiscatedPage); // WARNING: flushing the metadata page should be done after releasing the write latch; otherwise, the page // won't be flushed to disk because it won't be dirty until the write latch has been released. queue.put(confiscatedPage, callback); bufferCache.finishQueue(); + compressedPageWriter.endWriting(); metadataPage = getMetadataPageId(); ready = false; } else if (confiscatedPage != null) { @@ -249,7 +253,8 @@ public class AppendOnlyLinkedMetadataPageManager implements IMetadataPageManager } int pages = bufferCache.getNumPagesOfFile(fileId); if (pages == 0) { - return 0; + //At least there are 2 pages to consider the index is not empty + return IBufferCache.INVALID_PAGEID; } metadataPage = pages - 1; return metadataPage; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java index b77f14f..f83a27d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java @@ -39,6 +39,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; import org.apache.hyracks.storage.common.file.BufferedFileHandle; public abstract class AbstractTreeIndex implements ITreeIndex { @@ -241,8 +242,9 @@ public abstract class AbstractTreeIndex implements ITreeIndex { // HDFS. Since loading this tree relies on the root page actually being at that point, no further inserts into // that tree are allowed. Currently, this is not enforced. protected boolean releasedLatches; - protected final IFIFOPageQueue queue; + private final IFIFOPageQueue queue; protected List<ICachedPage> pagesToWrite; + private final ICompressedPageWriter compressedPageWriter; public AbstractTreeIndexBulkLoader(float fillFactor) throws HyracksDataException { leafFrame = leafFrameFactory.createFrame(); @@ -278,10 +280,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex { nodeFrontiers.add(leafFrontier); pagesToWrite = new ArrayList<>(); + compressedPageWriter = bufferCache.getCompressedPageWriter(fileId); } - protected void handleException() throws HyracksDataException { + protected void handleException() { // Unlatch and unpin pages that weren't in the queue to avoid leaking memory. + compressedPageWriter.abort(); for (NodeFrontier nodeFrontier : nodeFrontiers) { ICachedPage frontierPage = nodeFrontier.page; if (frontierPage.confiscated()) { @@ -296,10 +300,10 @@ public abstract class AbstractTreeIndex implements ITreeIndex { @Override public void end() throws HyracksDataException { - bufferCache.finishQueue(); if (hasFailed()) { throw HyracksDataException.create(getFailure()); } + bufferCache.finishQueue(); freePageManager.setRootPageId(rootPage); } @@ -320,6 +324,12 @@ public abstract class AbstractTreeIndex implements ITreeIndex { public void setLeafFrame(ITreeIndexFrame leafFrame) { this.leafFrame = leafFrame; } + + public void putInQueue(ICachedPage cPage) throws HyracksDataException { + compressedPageWriter.prepareWrite(cPage); + queue.put(cPage, this); + } + } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java index 76f7e61..89ccfed 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResource.java @@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -55,12 +56,13 @@ public class ExternalBTreeLocalResource extends LSMBTreeLocalResource { super(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, - ioSchedulerProvider, durable); + ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE); } private ExternalBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException { - super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields); + super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java index a4c24c9..555f641 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeLocalResourceFactory.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFactory { @@ -46,7 +47,7 @@ public class ExternalBTreeLocalResourceFactory extends LSMBTreeLocalResourceFact super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, bloomFilterKeyFields, bloomFilterFalsePositiveRate, - isPrimary, btreeFields); + isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java index 2a57e74..ddf0955 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResource.java @@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -58,13 +59,14 @@ public class ExternalBTreeWithBuddyLocalResource extends LSMBTreeLocalResource { super(typeTraits, cmpFactories, buddyBtreeFields, bloomFilterFalsePositiveRate, isPrimary, path, storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, - ioSchedulerProvider, durable); + ioSchedulerProvider, durable, NoOpCompressorDecompressorFactory.INSTANCE); } private ExternalBTreeWithBuddyLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException { - super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields); + super(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, btreeFields, + NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java index 2aff61a..89e5154 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/ExternalBTreeWithBuddyLocalResourceFactory.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalResourceFactory { @@ -46,7 +47,7 @@ public class ExternalBTreeWithBuddyLocalResourceFactory extends LSMBTreeLocalRes super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, null, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable, buddyBtreeFields, bloomFilterFalsePositiveRate, - isPrimary, btreeFields); + isPrimary, btreeFields, NoOpCompressorDecompressorFactory.INSTANCE); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java index 40278d0..7d5beff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -40,6 +41,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCacheProvider; import org.apache.hyracks.storage.am.lsm.common.dataflow.LsmResource; import org.apache.hyracks.storage.common.IStorageManager; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -52,6 +54,7 @@ public class LSMBTreeLocalResource extends LsmResource { protected final double bloomFilterFalsePositiveRate; protected final boolean isPrimary; protected final int[] btreeFields; + protected final ICompressorDecompressorFactory compressorDecompressorFactory; public LSMBTreeLocalResource(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, boolean isPrimary, String path, @@ -60,7 +63,8 @@ public class LSMBTreeLocalResource extends LsmResource { IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, ILSMOperationTrackerFactory opTrackerProvider, ILSMIOOperationCallbackFactory ioOpCallbackFactory, IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider, - ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable) { + ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean durable, + ICompressorDecompressorFactory compressorDecompressorFactory) { super(path, storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerProvider, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable); @@ -68,15 +72,18 @@ public class LSMBTreeLocalResource extends LsmResource { this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate; this.isPrimary = isPrimary; this.btreeFields = btreeFields; + this.compressorDecompressorFactory = compressorDecompressorFactory; } protected LSMBTreeLocalResource(IPersistedResourceRegistry registry, JsonNode json, int[] bloomFilterKeyFields, - double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) throws HyracksDataException { + double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields, + ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException { super(registry, json); this.bloomFilterKeyFields = bloomFilterKeyFields; this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate; this.isPrimary = isPrimary; this.btreeFields = btreeFields; + this.compressorDecompressorFactory = compressorDecompressorFactory; } @Override @@ -92,7 +99,8 @@ public class LSMBTreeLocalResource extends LsmResource { mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, isPrimary, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, - durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer()); + durable, metadataPageManagerFactory, updateAware, serviceCtx.getTracer(), + compressorDecompressorFactory); } @Override @@ -108,8 +116,11 @@ public class LSMBTreeLocalResource extends LsmResource { final double bloomFilterFalsePositiveRate = json.get("bloomFilterFalsePositiveRate").asDouble(); final boolean isPrimary = json.get("isPrimary").asBoolean(); final int[] btreeFields = OBJECT_MAPPER.convertValue(json.get("btreeFields"), int[].class); + final JsonNode compressorDecompressorNode = json.get("compressorDecompressorFactory"); + final ICompressorDecompressorFactory compDecompFactory = (ICompressorDecompressorFactory) registry + .deserializeOrDefault(compressorDecompressorNode, NoOpCompressorDecompressorFactory.class); return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, - btreeFields); + btreeFields, compDecompFactory); } @Override @@ -120,5 +131,6 @@ public class LSMBTreeLocalResource extends LsmResource { json.put("bloomFilterFalsePositiveRate", bloomFilterFalsePositiveRate); json.put("isPrimary", isPrimary); json.putPOJO("btreeFields", btreeFields); + json.putPOJO("compressorDecompressorFactory", compressorDecompressorFactory.toJson(registry)); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java index 5fae5b9..ea41c3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java @@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.lsm.btree.dataflow; import java.util.Map; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.io.FileReference; @@ -40,6 +41,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { protected final double bloomFilterFalsePositiveRate; protected final boolean isPrimary; protected final int[] btreeFields; + protected final ICompressorDecompressorFactory compressorDecompressorFactory; public LSMBTreeLocalResourceFactory(IStorageManager storageManager, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, ITypeTraits[] filterTypeTraits, @@ -48,7 +50,8 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { IMetadataPageManagerFactory metadataPageManagerFactory, IVirtualBufferCacheProvider vbcProvider, ILSMIOOperationSchedulerProvider ioSchedulerProvider, ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, boolean durable, int[] bloomFilterKeyFields, - double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields) { + double bloomFilterFalsePositiveRate, boolean isPrimary, int[] btreeFields, + ICompressorDecompressorFactory compressorDecompressorFactory) { super(storageManager, typeTraits, cmpFactories, filterTypeTraits, filterCmpFactories, filterFields, opTrackerFactory, ioOpCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, mergePolicyFactory, mergePolicyProperties, durable); @@ -56,6 +59,7 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate; this.isPrimary = isPrimary; this.btreeFields = btreeFields; + this.compressorDecompressorFactory = compressorDecompressorFactory; } @Override @@ -63,6 +67,6 @@ public class LSMBTreeLocalResourceFactory extends LsmResourceFactory { return new LSMBTreeLocalResource(typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary, fileRef.getRelativePath(), storageManager, mergePolicyFactory, mergePolicyProperties, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, opTrackerProvider, ioOpCallbackFactory, - metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable); + metadataPageManagerFactory, vbcProvider, ioSchedulerProvider, durable, compressorDecompressorFactory); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java index 0fc79eb..9169cbf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java @@ -21,12 +21,14 @@ package org.apache.hyracks.storage.am.lsm.btree.impls; import java.util.HashSet; import java.util.Set; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.btree.impls.DiskBTree; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.common.compression.file.CompressedFileReference; public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent { protected final DiskBTree btree; @@ -80,7 +82,12 @@ public class LSMBTreeDiskComponent extends AbstractLSMDiskComponent { static Set<String> getFiles(BTree btree) { Set<String> files = new HashSet<>(); - files.add(btree.getFileReference().getFile().getAbsolutePath()); + final FileReference fileRef = btree.getFileReference(); + files.add(fileRef.getAbsolutePath()); + if (fileRef.isCompressed()) { + final CompressedFileReference cFileRef = (CompressedFileReference) fileRef; + files.add(cFileRef.getLAFAbsolutePath()); + } return files; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java index 2240fd9..ca5f968 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -37,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; public class LSMBTreeFileManager extends AbstractLSMIndexFileManager { @@ -46,24 +48,30 @@ public class LSMBTreeFileManager extends AbstractLSMIndexFileManager { private final boolean hasBloomFilter; public LSMBTreeFileManager(IIOManager ioManager, FileReference file, - TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) { - super(ioManager, file, null); + TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter, + ICompressorDecompressorFactory compressorDecompressorFactory) { + super(ioManager, file, null, compressorDecompressorFactory); this.btreeFactory = btreeFactory; this.hasBloomFilter = hasBloomFilter; } + public LSMBTreeFileManager(IIOManager ioManager, FileReference file, + TreeIndexFactory<? extends ITreeIndex> btreeFactory, boolean hasBloomFilter) { + this(ioManager, file, btreeFactory, hasBloomFilter, NoOpCompressorDecompressorFactory.INSTANCE); + } + @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { String baseName = getNextComponentSequence(btreeFilter); - return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null, - hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); + return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null, + hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); } @Override public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); - return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null, - hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); + return new LSMComponentFileReferences(getFileReference(baseName + DELIMITER + BTREE_SUFFIX), null, + hasBloomFilter ? getFileReference(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java index d071bac..70ac3f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java @@ -21,6 +21,7 @@ package org.apache.hyracks.storage.am.lsm.btree.utils; import java.util.List; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -72,8 +73,8 @@ public class LSMBTreeUtil { ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits, IBinaryComparatorFactory[] filterCmpFactories, int[] btreeFields, int[] filterFields, boolean durable, - IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer) - throws HyracksDataException { + IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer, + ICompressorDecompressorFactory compressorDecompressorFactory) throws HyracksDataException { LSMBTreeTupleWriterFactory insertTupleWriterFactory = new LSMBTreeTupleWriterFactory(typeTraits, cmpFactories.length, false, updateAware); LSMBTreeTupleWriterFactory deleteTupleWriterFactory = @@ -106,10 +107,10 @@ public class LSMBTreeUtil { filterManager = new LSMComponentFilterManager(filterFrameFactory); } - //Primary LSMBTree index has a BloomFilter. - ILSMIndexFileManager fileNameManager = - new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, needKeyDupCheck); + ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, diskBTreeFactory, + needKeyDupCheck, compressorDecompressorFactory); + //Primary LSMBTree index has a BloomFilter. ILSMDiskComponentFactory componentFactory; ILSMDiskComponentFactory bulkLoadComponentFactory; if (needKeyDupCheck) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java index 904029b..7618264 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java @@ -27,6 +27,8 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import org.apache.hyracks.api.compression.ICompressorDecompressor; +import org.apache.hyracks.api.compression.ICompressorDecompressorFactory; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -38,6 +40,9 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressor; +import org.apache.hyracks.storage.common.compression.NoOpCompressorDecompressorFactory; +import org.apache.hyracks.storage.common.compression.file.CompressedFileReference; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.util.annotations.NotThreadSafe; @@ -71,6 +76,10 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage */ public static final String DELETE_TREE_SUFFIX = "d"; /** + * Indicates Look Aside File (LAF) for compressed indexes + */ + public static final String LAF_SUFFIX = ".dic"; + /** * Hides transaction components until they are either committed by removing this file or deleted along with the file */ public static final String TXN_PREFIX = ".T"; @@ -88,12 +97,20 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator(); protected final TreeIndexFactory<? extends ITreeIndex> treeFactory; private long lastUsedComponentSeq = UNINITALIZED_COMPONENT_SEQ; + private final ICompressorDecompressorFactory compressorDecompressorFactory; public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file, TreeIndexFactory<? extends ITreeIndex> treeFactory) { + this(ioManager, file, treeFactory, NoOpCompressorDecompressorFactory.INSTANCE); + } + + public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file, + TreeIndexFactory<? extends ITreeIndex> treeFactory, + ICompressorDecompressorFactory compressorDecompressorFactory) { this.ioManager = ioManager; this.baseDir = file; this.treeFactory = treeFactory; + this.compressorDecompressorFactory = compressorDecompressorFactory; } protected TreeIndexState isValidTreeIndex(ITreeIndex treeIndex) throws HyracksDataException { @@ -131,7 +148,7 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage IBufferCache bufferCache) throws HyracksDataException { String[] files = listDirFiles(baseDir, filter); for (String fileName : files) { - FileReference fileRef = baseDir.getChild(fileName); + FileReference fileRef = getFileReference(fileName); if (treeFactory == null) { allFiles.add(IndexComponentFileReference.of(fileRef)); continue; @@ -362,6 +379,21 @@ public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManage return IndexComponentFileReference.getFlushSequence(++lastUsedComponentSeq); } + protected FileReference getFileReference(String name) { + final ICompressorDecompressor compDecomp = compressorDecompressorFactory.createInstance(); + //Avoid creating LAF file for NoOpCompressorDecompressor + if (compDecomp != NoOpCompressorDecompressor.INSTANCE && isCompressible(name)) { + final String path = baseDir.getChildPath(name); + return new CompressedFileReference(baseDir.getDeviceHandle(), compDecomp, path, path + LAF_SUFFIX); + } + + return baseDir.getChild(name); + } + + private boolean isCompressible(String fileName) { + return !fileName.endsWith(BLOOM_FILTER_SUFFIX) && !fileName.endsWith(DELETE_TREE_SUFFIX); + } + private long getOnDiskLastUsedComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException { long maxComponentSeq = -1; final String[] files = listDirFiles(baseDir, filenameFilter); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java index 635fe7a..b34a13c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java @@ -941,9 +941,10 @@ public class RTree extends AbstractTreeIndex { propagateBulk(1, false, pagesToWrite); leafFrontier.pageId = freePageManager.takePage(metaFrame); - queue.put(leafFrontier.page, this); + + putInQueue(leafFrontier.page); for (ICachedPage c : pagesToWrite) { - queue.put(c, this); + putInQueue(c); } pagesToWrite.clear(); leafFrontier.page = bufferCache @@ -974,7 +975,7 @@ public class RTree extends AbstractTreeIndex { } for (ICachedPage c : pagesToWrite) { - queue.put(c, this); + putInQueue(c); } finish(); super.end(); @@ -1011,7 +1012,7 @@ public class RTree extends AbstractTreeIndex { ((RTreeNSMFrame) lowerFrame).adjustMBR(); interiorFrameTupleWriter.writeTupleFields(((RTreeNSMFrame) lowerFrame).getMBRTuples(), 0, mbr, 0); } - queue.put(n.page, this); + putInQueue(n.page); n.page = null; prevPageId = n.pageId; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml index 9c50f2d..423925b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/pom.xml @@ -66,5 +66,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.7.1</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java new file mode 100644 index 0000000..47e7534 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/AbstractBufferedFileIOManager.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IFileHandle; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.storage.common.compression.file.CompressedFileReference; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; +import org.apache.hyracks.util.annotations.NotThreadSafe; + +/** + * Handles all IO operations for a specified file. + */ +@NotThreadSafe +public abstract class AbstractBufferedFileIOManager { + private static final String ERROR_MESSAGE = "%s unexpected number of bytes: [expected: %d, actual: %d, file: %s]"; + private static final String READ = "Read"; + private static final String WRITE = "Written"; + + protected final BufferCache bufferCache; + protected final IPageReplacementStrategy pageReplacementStrategy; + private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache; + private final IIOManager ioManager; + + private IFileHandle fileHandle; + private volatile boolean hasOpen; + + protected AbstractBufferedFileIOManager(BufferCache bufferCache, IIOManager ioManager, + BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) { + this.bufferCache = bufferCache; + this.ioManager = ioManager; + this.headerPageCache = headerPageCache; + this.pageReplacementStrategy = pageReplacementStrategy; + hasOpen = false; + } + + /* ******************************** + * Read/Write page methods + * ******************************** + */ + + /** + * Read the CachedPage from disk + * + * @param cPage + * CachedPage in {@link BufferCache} + * @throws HyracksDataException + */ + public abstract void read(CachedPage cPage) throws HyracksDataException; + + /** + * Write the CachedPage into disk + * + * @param cPage + * CachedPage in {@link BufferCache} + * @throws HyracksDataException + */ + public void write(CachedPage cPage) throws HyracksDataException { + final int totalPages = cPage.getFrameSizeMultiplier(); + final int extraBlockPageId = cPage.getExtraBlockPageId(); + final BufferCacheHeaderHelper header = checkoutHeaderHelper(); + write(cPage, header, totalPages, extraBlockPageId); + } + + /** + * Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)} + * Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper} + * + * @param cPage + * CachedPage that will be written + * @param header + * HeaderHelper to add into the written page + * @param totalPages + * Number of pages to be written + * @param extraBlockPageId + * Extra page ID in case it has more than one page + * @throws HyracksDataException + */ + protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages, + int extraBlockPageId) throws HyracksDataException; + + /* ******************************** + * File operations' methods + * ******************************** + */ + + /** + * Open the file + * + * @throws HyracksDataException + */ + public void open(FileReference fileRef) throws HyracksDataException { + fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + hasOpen = true; + } + + /** + * Close the file + * + * @throws HyracksDataException + */ + public void close() throws HyracksDataException { + if (hasOpen) { + ioManager.close(fileHandle); + } + } + + public void purge() throws HyracksDataException { + ioManager.close(fileHandle); + } + + /** + * Force the file into disk + * + * @param metadata + * see {@link java.nio.channels.FileChannel#force(boolean)} + * @throws HyracksDataException + */ + public void force(boolean metadata) throws HyracksDataException { + ioManager.sync(fileHandle, metadata); + } + + /** + * Get the number of pages in the file + * + * @throws HyracksDataException + */ + public abstract int getNumberOfPages() throws HyracksDataException; + + public void markAsDeleted() throws HyracksDataException { + fileHandle = null; + } + + /** + * Check whether the file has been deleted + * + * @return + * true if has been deleted, false o.w + */ + public boolean hasBeenDeleted() { + return fileHandle == null; + } + + /** + * Check whether the file has ever been opened + * + * @return + * true if has ever been open, false o.w + */ + public final boolean hasBeenOpened() { + return hasOpen; + } + + public final FileReference getFileReference() { + return fileHandle.getFileReference(); + } + + public static void createFile(BufferCache bufferCache, FileReference fileRef) throws HyracksDataException { + IoUtil.create(fileRef); + if (fileRef.isCompressed()) { + final CompressedFileReference cFileRef = (CompressedFileReference) fileRef; + try { + bufferCache.createFile(cFileRef.getLAFFileReference()); + } catch (HyracksDataException e) { + //In case of creating the LAF file failed, delete fileRef + IoUtil.delete(fileRef); + throw e; + } + } + } + + public static void deleteFile(FileReference fileRef) throws HyracksDataException { + IoUtil.delete(fileRef); + if (fileRef.isCompressed()) { + final CompressedFileReference cFileRef = (CompressedFileReference) fileRef; + if (cFileRef.getFile().exists()) { + IoUtil.delete(cFileRef.getLAFFileReference()); + } + } + } + + /* ******************************** + * Compressed file methods + * ******************************** + */ + + public abstract ICompressedPageWriter getCompressedPageWriter(); + + /* ******************************** + * Common helper methods + * ******************************** + */ + + /** + * Get the offset for the first page + * + * @param cPage + * CachedPage for which the offset is needed + * @return + * page offset in the file + */ + protected abstract long getFirstPageOffset(CachedPage cPage); + + /** + * Get the offset for the extra page + * + * @param cPage + * CachedPage for which the offset is needed + * @return + * page offset in the file + */ + protected abstract long getExtraPageOffset(CachedPage cPage); + + protected final BufferCacheHeaderHelper checkoutHeaderHelper() { + BufferCacheHeaderHelper helper = headerPageCache.poll(); + if (helper == null) { + helper = new BufferCacheHeaderHelper(bufferCache.getPageSize()); + } + return helper; + } + + protected final void returnHeaderHelper(BufferCacheHeaderHelper buffer) { + headerPageCache.offer(buffer); //NOSONAR + } + + protected final long readToBuffer(ByteBuffer buf, long offset) throws HyracksDataException { + return ioManager.syncRead(fileHandle, offset, buf); + } + + protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException { + return ioManager.syncWrite(fileHandle, offset, buf); + } + + protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException { + return ioManager.syncWrite(fileHandle, offset, buf); + } + + protected final long getFileSize() { + return ioManager.getSize(fileHandle); + } + + protected final void verifyBytesWritten(long expected, long actual) { + if (expected != actual) { + throwException(WRITE, expected, actual); + } + } + + protected final boolean verifyBytesRead(long expected, long actual) { + if (expected != actual) { + if (actual == -1) { + // disk order scan code seems to rely on this behavior, so silently return + return false; + } else { + throwException(READ, expected, actual); + } + } + return true; + } + + protected void throwException(String op, long expected, long actual) { + final String path = fileHandle.getFileReference().getAbsolutePath(); + throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path)); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5aeba9b4/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java index 1e3f85b..7441395 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/BufferCache.java @@ -22,7 +22,6 @@ import static org.apache.hyracks.control.nc.io.IOManager.IO_REQUEST_QUEUE_SIZE; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -42,12 +41,12 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.api.replication.IIOReplicationManager; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter; import org.apache.hyracks.storage.common.file.BufferedFileHandle; import org.apache.hyracks.storage.common.file.IFileMapManager; import org.apache.logging.log4j.Level; @@ -80,18 +79,18 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache = new ArrayBlockingQueue<>(IO_REQUEST_QUEUE_SIZE); + private IIOReplicationManager ioReplicationManager; + private final List<ICachedPageInternal> cachedPages = new ArrayList<>(); + private final AtomicLong masterPinCount = new AtomicLong(); + + private boolean closed; + //DEBUG private static final Level fileOpsLevel = Level.TRACE; private ArrayList<CachedPage> confiscatedPages; private Lock confiscateLock; private HashMap<CachedPage, StackTraceElement[]> confiscatedPagesOwner; private ConcurrentHashMap<CachedPage, StackTraceElement[]> pinnedPageOwner; - //!DEBUG - private IIOReplicationManager ioReplicationManager; - private final List<ICachedPageInternal> cachedPages = new ArrayList<>(); - private final AtomicLong masterPinCount = new AtomicLong(); - - private boolean closed; public BufferCache(IIOManager ioManager, IPageReplacementStrategy pageReplacementStrategy, IPageCleanerPolicy pageCleanerPolicy, IFileMapManager fileMapManager, int maxOpenFiles, @@ -158,7 +157,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { synchronized (fileInfoMap) { fInfo = fileInfoMap.get(fileId); } - if (fInfo == null) { + if (fInfo == null || fInfo.hasBeenDeleted() || !fInfo.hasBeenOpened()) { throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been created."); } else if (fInfo.getReferenceCount() <= 0) { throw new HyracksDataException("pin called on a fileId " + fileId + " that has not been opened."); @@ -546,35 +545,9 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } private void read(CachedPage cPage) throws HyracksDataException { - BufferedFileHandle fInfo = getFileInfo(cPage); + BufferedFileHandle fInfo = getFileHandle(cPage); cPage.buffer.clear(); - BufferCacheHeaderHelper header = checkoutHeaderHelper(); - try { - long bytesRead = ioManager.syncRead(fInfo.getFileHandle(), - getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareRead()); - - if (bytesRead != getPageSizeWithHeader()) { - if (bytesRead == -1) { - // disk order scan code seems to rely on this behavior, so silently return - return; - } - throw new HyracksDataException("Failed to read a complete page: " + bytesRead); - } - int totalPages = header.processRead(cPage); - - if (totalPages > 1) { - pageReplacementStrategy.fixupCapacityOnLargeRead(cPage); - cPage.buffer.position(pageSize); - cPage.buffer.limit(totalPages * pageSize); - ioManager.syncRead(fInfo.getFileHandle(), getOffsetForPage(cPage.getExtraBlockPageId()), cPage.buffer); - } - } finally { - returnHeaderHelper(header); - } - } - - private long getOffsetForPage(long pageId) { - return pageId * getPageSizeWithHeader(); + fInfo.read(cPage); } @Override @@ -583,67 +556,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { pageReplacementStrategy.resizePage((ICachedPageInternal) cPage, totalPages, extraPageBlockHelper); } - BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException { - return getFileInfo(BufferedFileHandle.getFileId(cPage.dpid)); - } - - BufferedFileHandle getFileInfo(int fileId) throws HyracksDataException { - BufferedFileHandle fInfo; - synchronized (fileInfoMap) { - fInfo = fileInfoMap.get(fileId); - } - if (fInfo == null) { - throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId); - } - return fInfo; - } - - private BufferCacheHeaderHelper checkoutHeaderHelper() { - BufferCacheHeaderHelper helper = headerPageCache.poll(); - if (helper == null) { - helper = new BufferCacheHeaderHelper(pageSize); - } - return helper; - } - - private void returnHeaderHelper(BufferCacheHeaderHelper buffer) { - headerPageCache.offer(buffer); - } - void write(CachedPage cPage) throws HyracksDataException { - BufferedFileHandle fInfo = getFileInfo(cPage); + BufferedFileHandle fInfo = getFileHandle(cPage); // synchronize on fInfo to prevent the file handle from being deleted until the page is written. synchronized (fInfo) { - if (fInfo.fileHasBeenDeleted()) { + if (fInfo.hasBeenDeleted()) { return; } - ByteBuffer buf = cPage.buffer.duplicate(); - final int totalPages = cPage.getFrameSizeMultiplier(); - final int extraBlockPageId = cPage.getExtraBlockPageId(); - final boolean contiguousLargePages = (BufferedFileHandle.getPageId(cPage.dpid) + 1) == extraBlockPageId; - BufferCacheHeaderHelper header = checkoutHeaderHelper(); - try { - buf.limit(contiguousLargePages ? pageSize * totalPages : pageSize); - buf.position(0); - long bytesWritten = ioManager.syncWrite(fInfo.getFileHandle(), - getOffsetForPage(BufferedFileHandle.getPageId(cPage.dpid)), header.prepareWrite(cPage, buf)); - - if (bytesWritten != (contiguousLargePages ? pageSize * (totalPages - 1) : 0) - + getPageSizeWithHeader()) { - throw new HyracksDataException("Failed to write completely: " + bytesWritten); - } - } finally { - returnHeaderHelper(header); - } - if (totalPages > 1 && !contiguousLargePages) { - buf.limit(totalPages * pageSize); - ioManager.syncWrite(fInfo.getFileHandle(), getOffsetForPage(extraBlockPageId), buf); - } - if (buf.capacity() != pageSize * totalPages) { - throw new IllegalStateException("Illegal number of bytes written, expected bytes written: " - + pageSize * totalPages + " actual bytes writte: " + buf.capacity()); - } + fInfo.write(cPage); } + } @Override @@ -794,8 +716,8 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { synchronized (fileInfoMap) { fileInfoMap.forEach((key, value) -> { try { - sweepAndFlush(key, true); - ioManager.close(value.getFileHandle()); + sweepAndFlush(value, true); + value.close(); } catch (HyracksDataException e) { if (LOGGER.isWarnEnabled()) { LOGGER.log(Level.WARN, "Error flushing file id: " + key, e); @@ -811,11 +733,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (LOGGER.isEnabled(fileOpsLevel)) { LOGGER.log(fileOpsLevel, "Creating file: " + fileRef + " in cache: " + this); } - IoUtil.create(fileRef); + BufferedFileHandle.createFile(this, fileRef); + int fileId; try { synchronized (fileInfoMap) { - return fileMapManager.registerFile(fileRef); + fileId = fileMapManager.registerFile(fileRef); + getOrCreateFileHandle(fileId); } + return fileId; } catch (Exception e) { // If file registration failed for any reason, we need to undo the file creation try { @@ -851,11 +776,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } try { final BufferedFileHandle fInfo = getOrCreateFileHandle(fileId); - if (fInfo.getFileHandle() == null) { + //CompressedFileReference may open another file which may sweep and close out this fInfo + fInfo.incReferenceCount(); + + if (!fInfo.hasBeenOpened()) { // a new file synchronized (fInfo) { // prevent concurrent opening of the same file - if (fInfo.getFileHandle() == null) { + if (!fInfo.hasBeenOpened()) { if (fileInfoMap.size() > maxOpenFiles) { closeOpeningFiles(fInfo); } @@ -864,15 +792,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { synchronized (fileInfoMap) { fileRef = fileMapManager.lookupFileName(fileId); } - IFileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, - IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); - fInfo.setFileHandle(fh); + fInfo.open(fileRef); } } } - fInfo.incReferenceCount(); } catch (Exception e) { - removeFileInfo(fileId); + removeFileHandle(fileId); throw HyracksDataException.create(e); } } @@ -888,11 +813,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (fh != newFileHandle && fh.getReferenceCount() <= 0) { if (fh.getReferenceCount() < 0) { throw new IllegalStateException("Illegal reference count " + fh.getReferenceCount() - + " of file " + fh.getFileHandle().getFileReference()); + + " of file " + fh.getFileReference()); } int entryFileId = entry.getKey(); - sweepAndFlush(entryFileId, true); - ioManager.close(entry.getValue().getFileHandle()); + sweepAndFlush(fh, true); + entry.getValue().close(); fileInfoMap.remove(entryFileId); unreferencedFileFound = true; // for-each iterator is invalid because we changed @@ -908,7 +833,12 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } } - private void sweepAndFlush(int fileId, boolean flushDirtyPages) throws HyracksDataException { + private void sweepAndFlush(BufferedFileHandle fInfo, boolean flushDirtyPages) throws HyracksDataException { + if (!fInfo.hasBeenOpened()) { + //Skip flushing as the file has not been open + return; + } + final int fileId = fInfo.getFileId(); for (final CacheBucket bucket : pageMap) { bucket.bucketLock.lock(); try { @@ -973,7 +903,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { synchronized (fileInfoMap) { BufferedFileHandle fInfo = fileInfoMap.get(fileId); - if (fInfo == null) { + if (fInfo == null || !fInfo.hasBeenOpened()) { throw new HyracksDataException("Closing unopened file"); } if (fInfo.decReferenceCount() < 0) { @@ -997,7 +927,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { synchronized (fileInfoMap) { fInfo = fileInfoMap.get(fileId); } - ioManager.sync(fInfo.getFileHandle(), metadata); + fInfo.force(metadata); } @Override @@ -1013,7 +943,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (mapped) { deleteFile(fileId); } else { - IoUtil.delete(fileRef); + BufferedFileHandle.deleteFile(fileRef); } } @@ -1022,11 +952,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (LOGGER.isEnabled(fileOpsLevel)) { LOGGER.log(fileOpsLevel, "Deleting file: " + fileId + " in cache: " + this); } - BufferedFileHandle fInfo = removeFileInfo(fileId); + BufferedFileHandle fInfo = removeFileHandle(fileId); if (fInfo == null) { return; } - sweepAndFlush(fileId, false); + sweepAndFlush(fInfo, false); try { if (fInfo.getReferenceCount() > 0) { throw new HyracksDataException("Deleting open file"); @@ -1040,11 +970,11 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } finally { try { synchronized (fInfo) { - ioManager.close(fInfo.getFileHandle()); + fInfo.close(); fInfo.markAsDeleted(); } } finally { - IoUtil.delete(fileRef); + BufferedFileHandle.deleteFile(fileRef); } } } @@ -1178,10 +1108,7 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { if (fInfo == null) { throw new HyracksDataException("No such file mapped for fileId:" + fileId); } - if (DEBUG) { - assert ioManager.getSize(fInfo.getFileHandle()) % getPageSizeWithHeader() == 0; - } - return (int) (ioManager.getSize(fInfo.getFileHandle()) / getPageSizeWithHeader()); + return fInfo.getNumberOfPages(); } } @@ -1290,18 +1217,35 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { return null; } - private BufferedFileHandle getOrCreateFileHandle(int fileId) { + private BufferedFileHandle getOrCreateFileHandle(int fileId) throws HyracksDataException { synchronized (fileInfoMap) { - return fileInfoMap.computeIfAbsent(fileId, id -> new BufferedFileHandle(fileId, null)); + final FileReference fileRef = fileMapManager.lookupFileName(fileId); + return fileInfoMap.computeIfAbsent(fileId, id -> BufferedFileHandle.create(fileRef, fileId, this, ioManager, + headerPageCache, pageReplacementStrategy)); } } - private BufferedFileHandle removeFileInfo(int fileId) { + private BufferedFileHandle removeFileHandle(int fileId) { synchronized (fileInfoMap) { return fileInfoMap.remove(fileId); } } + private BufferedFileHandle getFileHandle(CachedPage cPage) throws HyracksDataException { + return getFileHandle(BufferedFileHandle.getFileId(cPage.dpid)); + } + + private BufferedFileHandle getFileHandle(int fileId) throws HyracksDataException { + BufferedFileHandle fInfo; + synchronized (fileInfoMap) { + fInfo = fileInfoMap.get(fileId); + } + if (fInfo == null) { + throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileId); + } + return fInfo; + } + private ICachedPage getPageLoop(long dpid, int multiplier, boolean confiscate) throws HyracksDataException { final long startingPinCount = DEBUG ? masterPinCount.get() : -1; int cycleCount = 0; @@ -1442,54 +1386,16 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { @Override public void purgeHandle(int fileId) throws HyracksDataException { - BufferedFileHandle fh = removeFileInfo(fileId); + BufferedFileHandle fh = removeFileHandle(fileId); if (fh != null) { synchronized (fileInfoMap) { fileMapManager.unregisterFile(fileId); + fh.purge(); } - ioManager.close(fh.getFileHandle()); } } - static class BufferCacheHeaderHelper { - private static final int FRAME_MULTIPLIER_OFF = 0; - private static final int EXTRA_BLOCK_PAGE_ID_OFF = FRAME_MULTIPLIER_OFF + 4; // 4 - - private final ByteBuffer buf; - private final ByteBuffer[] array; - - private BufferCacheHeaderHelper(int pageSize) { - buf = ByteBuffer.allocate(RESERVED_HEADER_BYTES + pageSize); - array = new ByteBuffer[] { buf, null }; - } - - private ByteBuffer[] prepareWrite(CachedPage cPage, ByteBuffer pageBuffer) { - buf.position(0); - buf.limit(RESERVED_HEADER_BYTES); - buf.putInt(FRAME_MULTIPLIER_OFF, cPage.getFrameSizeMultiplier()); - buf.putInt(EXTRA_BLOCK_PAGE_ID_OFF, cPage.getExtraBlockPageId()); - array[1] = pageBuffer; - return array; - } - - private ByteBuffer prepareRead() { - buf.position(0); - buf.limit(buf.capacity()); - return buf; - } - - private int processRead(CachedPage cPage) { - buf.position(RESERVED_HEADER_BYTES); - cPage.buffer.position(0); - cPage.buffer.put(buf); - int multiplier = buf.getInt(FRAME_MULTIPLIER_OFF); - cPage.setFrameSizeMultiplier(multiplier); - cPage.setExtraBlockPageId(buf.getInt(EXTRA_BLOCK_PAGE_ID_OFF)); - return multiplier; - } - } - @Override public void closeFileIfOpen(FileReference fileRef) { synchronized (fileInfoMap) { @@ -1508,4 +1414,14 @@ public class BufferCache implements IBufferCacheInternal, ILifeCycleComponent { } } + @Override + public ICompressedPageWriter getCompressedPageWriter(int fileId) { + final BufferedFileHandle fInfo; + synchronized (fileInfoMap) { + fInfo = fileInfoMap.get(fileId); + } + + return fInfo.getCompressedPageWriter(); + } + } \ No newline at end of file
