This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 43137d598aeddc42de33531f4556ae8950970bf4 Author: Ritik Raj <[email protected]> AuthorDate: Wed Aug 20 21:48:39 2025 +0530 [ASTERIXDB-3636][STO] Add ColumnBufferPool to prevent OOM during high-volume column ingestion - user model changes: yes - storage format changes: no - interface changes: yes details: Introduce a semaphore-based buffer pool to manage configurable column buffers(default: 4KB) and prevent excessive memory allocation during high ingestion rates or when processing datasets with large numbers of columns. - Implement reservation-based flow control using Semaphore for buffer allocation - Add buffer recycling to reuse existing buffers instead of creating new ones - Configure memory limits based on percentage of total JVM memory - Include timeout mechanisms for buffer reservation to prevent deadlocks - Add logging for buffer pool health - Prevent JVM OOM and reduce GC overhead from excessive buffer creation Without pooling, each column's zeroth buffer (32KB) would be allocated independently, leading to memory exhaustion and GC pressure in scenarios with many columns or high ingestion throughput. Ext-ref: MB-68059 Change-Id: I62437839d89b11d950e6715a00e844aedd0dab8e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20249 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 22 + .../api/cluster_state_1/cluster_state_1.1.regexadm | 4 + .../cluster_state_1_full.1.regexadm | 4 + .../cluster_state_1_less.1.regexadm | 4 + .../out/AbstractMultiBufferBytesOutputStream.java | 10 +- .../out/MultiTemporaryBufferBytesOutputStream.java | 57 ++- .../operation/lsm/flush/FlushColumnMetadata.java | 6 +- .../lsm/flush/FlushColumnTupleWriter.java | 6 + .../lsm/merge/MergeColumnTupleWriter.java | 6 + .../lsm/merge/MergeColumnWriteMetadata.java | 2 +- .../column/common/buffer/NoOpWriteMultiPageOp.java | 10 + .../column/common/buffer/TestWriteMultiPageOp.java | 10 + .../asterix/common/api/INcApplicationContext.java | 3 + .../asterix/common/config/StorageProperties.java | 42 +- .../runtime/utils/RuntimeComponentsProvider.java | 6 + .../hyracks/control/common/config/OptionTypes.java | 94 +++- .../btree/helper/BTreeHelperStorageManager.java | 6 + .../examples/btree/helper/RuntimeContext.java | 9 + .../column/api/AbstractColumnTupleWriter.java | 5 + .../btree/column/api/IColumnWriteMultiPageOp.java | 11 + .../dataflow/LSMColumnBTreeLocalResource.java | 5 +- .../lsm/btree/column/impls/btree/ColumnBTree.java | 10 +- .../column/impls/btree/ColumnBTreeBulkloader.java | 315 +++++++++---- .../column/impls/btree/ColumnBTreeFactory.java | 10 +- .../lsm/btree/column/impls/lsm/LSMColumnBTree.java | 22 +- .../lsm/btree/column/utils/LSMColumnBTreeUtil.java | 33 +- .../hyracks/storage/common/IStorageManager.java | 6 + .../common/buffercache/ColumnBufferPool.java | 243 ++++++++++ .../common/buffercache/FreeColumnBufferPool.java | 36 +- .../common/buffercache/IColumnBufferPool.java | 78 ++++ .../hyracks/test/support/TestStorageManager.java | 6 + .../support/TestStorageManagerComponentHolder.java | 21 + .../storage/common/ColumnBufferPoolTest.java | 514 +++++++++++++++++++++ 33 files changed, 1448 insertions(+), 168 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 6416df9245..9d80525f20 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -117,11 +117,14 @@ import org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.buffercache.BufferCache; import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy; +import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator; import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy; +import org.apache.hyracks.storage.common.buffercache.FreeColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator; import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy; import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; @@ -145,6 +148,7 @@ import org.apache.logging.log4j.Logger; public class NCAppRuntimeContext implements INcApplicationContext { private static final Logger LOGGER = LogManager.getLogger(); + public static final double INVALID_BUFFER_POOL_MEMORY_PERCENTAGE = 0.0; private ILSMMergePolicyFactory metadataMergePolicyFactory; private final INCServiceContext ncServiceContext; @@ -162,6 +166,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { private ExecutorService threadExecutor; private IDatasetLifecycleManager datasetLifecycleManager; private IBufferCache bufferCache; + private IColumnBufferPool columnBufferPool; private IVirtualBufferCache virtualBufferCache; private ITransactionSubsystem txnSubsystem; private IMetadataNode metadataNodeStub; @@ -325,6 +330,17 @@ public class NCAppRuntimeContext implements INcApplicationContext { fileInfoMap, defaultContext); } + if (storageProperties.getColumnBufferPoolMemoryPercentage() <= INVALID_BUFFER_POOL_MEMORY_PERCENTAGE) { + LOGGER.info("Using FreeColumnBufferPool since column buffer pool size percentage is {}", + storageProperties.getColumnBufferSize()); + this.columnBufferPool = new FreeColumnBufferPool(); + } else { + this.columnBufferPool = new ColumnBufferPool(storageProperties.getColumnBufferSize(), + storageProperties.getColumnBufferPoolMaxSize(), + storageProperties.getColumnBufferPoolMemoryPercentage(), + storageProperties.getColumnBufferAcquireTimeout()); + } + if (cloudConfigurator != null) { diskCacheService = cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), bufferCache, fileInfoMap); @@ -347,6 +363,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager(); lccm.register((ILifeCycleComponent) virtualBufferCache); lccm.register((ILifeCycleComponent) bufferCache); + lccm.register((ILifeCycleComponent) columnBufferPool); /* * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager * to process any logs that might be generated during stopping these components @@ -404,6 +421,11 @@ public class NCAppRuntimeContext implements INcApplicationContext { return bufferCache; } + @Override + public IColumnBufferPool getColumnBufferPool() { + return columnBufferPool; + } + @Override public IVirtualBufferCache getVirtualBufferCache() { return virtualBufferCache; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 6c78c21b42..24f084d814 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -90,6 +90,10 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, + "storage.column.buffer.acquire.timeout" : 120000, + "storage.column.buffer.pool.max.size" : 8000, + "storage.column.buffer.pool.memory.percentage" : 3.0, + "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index c44e600a3b..89aa04551b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -90,6 +90,10 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, + "storage.column.buffer.acquire.timeout" : 120000, + "storage.column.buffer.pool.max.size" : 8000, + "storage.column.buffer.pool.memory.percentage" : 3.0, + "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 10eab67e07..1882daff53 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -90,6 +90,10 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, + "storage.column.buffer.acquire.timeout" : 120000, + "storage.column.buffer.pool.max.size" : 8000, + "storage.column.buffer.pool.memory.percentage" : 3.0, + "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java index 0c311f1433..33cb5d86f2 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java @@ -47,7 +47,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputS protected abstract void preReset() throws HyracksDataException; @Override - public final void reset() throws HyracksDataException { + public void reset() throws HyracksDataException { preReset(); position = 0; currentBufferIndex = 0; @@ -149,6 +149,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputS @Override public final void finish() { currentBuf = null; + releaseColumnBuffer(); buffers.clear(); allocatedBytes = 0; } @@ -158,7 +159,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputS * ************************************************* */ - private void ensureCapacity(int length) throws HyracksDataException { + protected void ensureCapacity(int length) throws HyracksDataException { if (position + length > allocatedBytes) { allocateMoreBuffers(length); } else if (length > 0) { @@ -195,4 +196,9 @@ abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputS allocateBuffer(); } } + + protected void releaseColumnBuffer() { + // No-op by default + // Overridden in MultiTemporaryBufferBytesOutputStream + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java index 8988026ab9..6642cd8ef0 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java @@ -25,15 +25,23 @@ import java.nio.ByteBuffer; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; -import org.apache.hyracks.util.StorageUtil; public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream { - private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE); + private boolean isFirstBufferFromColumnPool = false; public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) { super(multiPageOpRef); } + // Don't initialize for temporary buffers, as they are confiscated from the pool + @Override + public void reset() throws HyracksDataException { + preReset(); + position = 0; + currentBufferIndex = 0; + releaseColumnBuffer(); + } + @Override protected void preReset() { //NoOp @@ -41,21 +49,60 @@ public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBu @Override protected ByteBuffer confiscateNewBuffer() throws HyracksDataException { - if (buffers.isEmpty()) { + // When a buffer reset occurs, the 0th buffer is set to null. + // If buffers are not empty, this means a buffer from the column pool was previously used. + // If the 0th buffer is null, a new buffer must be confiscated. + if (buffers.isEmpty() || buffers.get(0) == null) { /* - * One buffer on the house to avoid confiscating a whole page for a tiny stream. + * One buffer from the pool to avoid confiscating a whole page for a tiny stream. * This protects pressuring the buffer cache from confiscating pages for small columns. Think sparse * columns, which may take only a few hundreds of bytes to write. */ - return ByteBuffer.allocate(INITIAL_BUFFER_SIZE); + isFirstBufferFromColumnPool = true; + return multiPageOpRef.getValue().confiscateTemporary0thBuffer(); } return multiPageOpRef.getValue().confiscateTemporary(); } + @Override + protected void ensureCapacity(int length) throws HyracksDataException { + ensure0thBufferAllocated(); + super.ensureCapacity(length); + } + + private void ensure0thBufferAllocated() throws HyracksDataException { + if ((currentBufferIndex == 0 && !isFirstBufferFromColumnPool)) { + // grab one from pool, as the buffers has already been reserved. + currentBuf = confiscateNewBuffer(); + boolean isBufferEmpty = buffers.isEmpty(); + currentBuf.clear(); + if (buffers.isEmpty()) { + buffers.add(currentBuf); + } else { + buffers.set(0, currentBuf); + } + if (isBufferEmpty) { + allocatedBytes += currentBuf.capacity(); + } + } + } + + @Override + protected void releaseColumnBuffer() { + if (!isFirstBufferFromColumnPool) { + return; + } + multiPageOpRef.getValue().releaseTemporary0thBuffer(buffers.get(0)); + isFirstBufferFromColumnPool = false; + buffers.set(0, null); + } + @Override public void writeTo(OutputStream outputStream) throws IOException { int writtenSize = 0; int numberOfUsedBuffers = currentBufferIndex + 1; + // This can be the case where the empty columns are being written + ensure0thBufferAllocated(); for (int i = 0; i < numberOfUsedBuffers; i++) { ByteBuffer buffer = buffers.get(i); outputStream.write(buffer.array(), 0, buffer.position()); diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java index b45d7d91de..21ad11b3c6 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java @@ -457,11 +457,13 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { } public void close() { - //Dereference multiPageOp - multiPageOpRef.setValue(null); for (int i = 0; i < columnWriters.size(); i++) { columnWriters.get(i).close(); } + // In close, there is a rest call, where the buffer is being returned to @link ColumnBufferPool + // hence, we should dereference the multiPageOp after close(). + //Dereference multiPageOp + multiPageOpRef.setValue(null); } protected void flushDefinitionLevels(int parentMask, int childMask, RunLengthIntArray parentDefLevels, diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java index b33a2b4c00..c0fd0493cb 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java @@ -200,6 +200,12 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { writer.close(); } + @Override + public final void abort() { + // this call will reset the writers, releasing the 0th buffer back to the pool + columnMetadata.close(); + } + public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) throws HyracksDataException { // Execution can reach here in case of Load statements // and the type of tuple in that case is PermutingFrameTupleReference diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java index 9feb789180..2c7ae4d9bf 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java @@ -243,6 +243,12 @@ public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { writer.close(); } + @Override + public final void abort() { + // this call will reset the writers, releasing the 0th buffer back to the pool + columnMetadata.close(); + } + @Override public void reset() { } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java index b0d1a01015..9860822ec2 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java @@ -85,10 +85,10 @@ public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetad } public void close() { - multiPageOpRef.setValue(null); for (int i = 0; i < columnWriters.size(); i++) { columnWriters.get(i).close(); } + multiPageOpRef.setValue(null); } public static MergeColumnWriteMetadata create(ARecordType datasetType, ARecordType metaType, diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java index aa3cb71aaf..3a8304ec16 100644 --- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java +++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java @@ -44,6 +44,16 @@ public class NoOpWriteMultiPageOp implements IColumnWriteMultiPageOp { return null; } + @Override + public ByteBuffer confiscateTemporary0thBuffer() { + return null; + } + + @Override + public void releaseTemporary0thBuffer(ByteBuffer buffer) { + + } + @Override public void persist() throws HyracksDataException { diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java index 556a87c82f..fb19a7e624 100644 --- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java +++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java @@ -47,6 +47,16 @@ public class TestWriteMultiPageOp implements IColumnWriteMultiPageOp { return dummyBufferCache.allocateTemporary(); } + @Override + public ByteBuffer confiscateTemporary0thBuffer() { + return ByteBuffer.allocate(32 * 1024); // 32KB + } + + @Override + public void releaseTemporary0thBuffer(ByteBuffer buffer) { + + } + @Override public void persist() { //NoOp diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java index 888cea1186..6d4a428876 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java @@ -43,6 +43,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.storage.common.file.IResourceIdFactory; import org.apache.hyracks.util.cache.ICacheManager; @@ -69,6 +70,8 @@ public interface INcApplicationContext extends IApplicationContext { IBufferCache getBufferCache(); + IColumnBufferPool getColumnBufferPool(); + IVirtualBufferCache getVirtualBufferCache(); ILocalResourceRepository getLocalResourceRepository(); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index c9090c63b3..435d8c2596 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -22,10 +22,13 @@ import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT; +import static org.apache.hyracks.control.common.config.OptionTypes.LONG; import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT; import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; +import static org.apache.hyracks.control.common.config.OptionTypes.getRangedDoubleType; +import static org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType; import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; @@ -71,7 +74,11 @@ public class StorageProperties extends AbstractProperties { STORAGE_FORMAT(STRING, "row"), STORAGE_PARTITIONING(STRING, "dynamic"), STORAGE_PARTITIONS_COUNT(INTEGER, 8), - STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE)); + STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE)), + STORAGE_COLUMN_BUFFER_SIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(4, KILOBYTE)), + STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE(getRangedDoubleType(0.0, 100.0), 3.0d), + STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE(getRangedIntegerType(1, 50000), 8000), + STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT(LONG, TimeUnit.SECONDS.toMillis(120)); private final IOptionType interpreter; private final Object defaultValue; @@ -95,6 +102,10 @@ public class StorageProperties extends AbstractProperties { case STORAGE_COLUMN_MAX_TUPLE_COUNT: case STORAGE_COLUMN_FREE_SPACE_TOLERANCE: case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE: + case STORAGE_COLUMN_BUFFER_SIZE: + case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE: + case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE: + case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT: return Section.COMMON; default: return Section.NC; @@ -163,6 +174,14 @@ public class StorageProperties extends AbstractProperties { + " changed after any dataset has been created"; case STORAGE_MAX_COMPONENT_SIZE: return "The resultant disk component after a merge must not exceed the specified maximum size."; + case STORAGE_COLUMN_BUFFER_SIZE: + return "The size in bytes of each buffer in the column buffer pool."; + case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE: + return "The percentage of the JVM memory allocated to the column buffer pool. This controls the number of the column buffer relative to the total JVM memory."; + case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE: + return "The maximum number of buffers in the column buffer pool."; + case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT: + return "The maximum time in milliseconds to wait for acquiring a buffer from the column buffer pool."; default: throw new IllegalStateException("NYI: " + this); } @@ -233,6 +252,22 @@ public class StorageProperties extends AbstractProperties { return accessor.getDouble(Option.STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE); } + public int getColumnBufferSize() { + return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_SIZE); + } + + public double getColumnBufferPoolMemoryPercentage() { + return accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE); + } + + public int getColumnBufferPoolMaxSize() { + return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE); + } + + public long getColumnBufferAcquireTimeout() { + return accessor.getLong(Option.STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT); + } + public int getBufferCacheNumPages() { return (int) (getBufferCacheSize() / (getBufferCachePageSize() + IBufferCache.RESERVED_HEADER_BYTES)); } @@ -248,6 +283,11 @@ public class StorageProperties extends AbstractProperties { int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions); int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions); int writeBufferSize = runtimeContext.getCloudProperties().getWriteBufferSize(); + + double columnBufferPoolSizePercentage = + accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE); + long maxMemoryForColumnBuffers = (long) (MAX_HEAP_BYTES * (columnBufferPoolSizePercentage / 100)); + jobExecutionMemory -= maxMemoryForColumnBuffers; jobExecutionMemory -= (long) (maxConcurrentFlushes + maxConcurrentMerges) * writeBufferSize; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java index 620417be19..6c60bb522f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java @@ -30,6 +30,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProv import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.IStorageManager; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.storage.common.file.IResourceIdFactory; @@ -59,6 +60,11 @@ public class RuntimeComponentsProvider implements IStorageManager, ILSMIOOperati return getAppCtx(ctx).getBufferCache(); } + @Override + public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) { + return getAppCtx(ctx).getColumnBufferPool(); + } + @Override public ILocalResourceRepository getLocalResourceRepository(INCServiceContext ctx) { return getAppCtx(ctx).getLocalResourceRepository(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java index 5970abb4c4..ff0288bb99 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java @@ -73,27 +73,7 @@ public class OptionTypes { public static final IOptionType<Integer> INTEGER = new IntegerOptionType(); - public static final IOptionType<Double> DOUBLE = new IOptionType<Double>() { - @Override - public Double parse(String s) { - return Double.parseDouble(s); - } - - @Override - public Double parse(JsonNode node) { - return node.isNull() ? null : node.asDouble(); - } - - @Override - public Class<Double> targetType() { - return Double.class; - } - - @Override - public void serializeJSONField(String fieldName, Object value, ObjectNode node) { - node.put(fieldName, (double) value); - } - }; + public static final IOptionType<Double> DOUBLE = new DoubleOptionType(); public static final IOptionType<String> STRING = new IOptionType<String>() { @Override @@ -260,6 +240,11 @@ public class OptionTypes { return new RangedIntegerOptionType(minValueInclusive, maxValueInclusive); } + public static IOptionType<Double> getRangedDoubleType(final double minValueInclusive, + final double maxValueInclusive) { + return new RangedDoubleOptionType(minValueInclusive, maxValueInclusive); + } + public static class IntegerOptionType implements IOptionType<Integer> { @Override public Integer parse(String s) { @@ -432,6 +417,73 @@ public class OptionTypes { } } + private static class DoubleOptionType implements IOptionType<Double> { + @Override + public Double parse(String s) { + return Double.parseDouble(s); + } + + @Override + public Double parse(JsonNode node) { + return node.isNull() ? null : node.asDouble(); + } + + @Override + public Class<Double> targetType() { + return Double.class; + } + + @Override + public void serializeJSONField(String fieldName, Object value, ObjectNode node) { + node.put(fieldName, (double) value); + } + } + + private static class RangedDoubleOptionType extends DoubleOptionType { + private final double minValue; + private final double maxValue; + + RangedDoubleOptionType(double minValue, double maxValue) { + this.minValue = minValue; + this.maxValue = maxValue; + } + + @Override + public Double parse(String value) { + double doubleValue = super.parse(value); + rangeCheck(doubleValue); + return doubleValue; + } + + @Override + public Double parse(JsonNode node) { + if (node.isNull()) { + return null; + } + double doubleValue = node.asDouble(); + rangeCheck(doubleValue); + return doubleValue; + } + + void rangeCheck(double doubleValue) { + if (Double.isNaN(doubleValue)) { + throw new IllegalArgumentException("double value cannot be NaN"); + } + if (doubleValue < minValue || doubleValue > maxValue) { + if (maxValue == Double.MAX_VALUE) { + if (minValue == 0.0) { + throw new IllegalArgumentException("double value must not be negative, but was " + doubleValue); + } else if (minValue == Double.MIN_VALUE) { + throw new IllegalArgumentException( + "double value must be greater than " + Double.MIN_VALUE + ", but was " + doubleValue); + } + } + throw new IllegalArgumentException("double value must be between " + minValue + "-" + maxValue + + " (inclusive), but was " + doubleValue); + } + } + } + public static final IOptionType<Long> getRangedLongByteUnit(long min, long max) { return new LongByteUnit(min, max); } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java index 426e81db8f..487652e31a 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java @@ -26,6 +26,7 @@ import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.IResourceLifecycleManager; import org.apache.hyracks.storage.common.IStorageManager; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService; import org.apache.hyracks.storage.common.file.ResourceIdFactory; @@ -48,6 +49,11 @@ public class BTreeHelperStorageManager implements IStorageManager { return RuntimeContext.get(ctx).getBufferCache(); } + @Override + public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) { + return RuntimeContext.get(ctx).getColumnBufferPool(); + } + @Override public ILocalResourceRepository getLocalResourceRepository(INCServiceContext ctx) { return RuntimeContext.get(ctx).getLocalResourceRepository(); diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java index f5edc7469c..08a1dec947 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java @@ -21,6 +21,7 @@ package org.apache.hyracks.examples.btree.helper; import java.util.HashMap; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -31,11 +32,13 @@ import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.IResourceLifecycleManager; import org.apache.hyracks.storage.common.buffercache.BufferCache; import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy; +import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator; import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy; import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider; import org.apache.hyracks.storage.common.file.FileMapManager; @@ -51,6 +54,7 @@ import org.apache.hyracks.util.annotations.TestOnly; public class RuntimeContext { private final IIOManager ioManager; private final IBufferCache bufferCache; + private final IColumnBufferPool columnBufferPool; private final IFileMapManager fileMapManager; private final ILocalResourceRepository localResourceRepository; private final IResourceLifecycleManager<IIndex> lcManager; @@ -65,6 +69,7 @@ public class RuntimeContext { this.ioManager = appCtx.getIoManager(); bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, 10, threadFactory, new HashMap<>(), DefaultBufferCacheReadContextProvider.DEFAULT); + columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, TimeUnit.MINUTES.toMillis(2)); ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory(); localResourceRepository = localResourceRepositoryFactory.createRepository(); resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); @@ -83,6 +88,10 @@ public class RuntimeContext { return bufferCache; } + public IColumnBufferPool getColumnBufferPool() { + return columnBufferPool; + } + public IFileMapProvider getFileMapManager() { return fileMapManager; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java index 60f7967cbd..fa1d9c4167 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java @@ -96,6 +96,11 @@ public abstract class AbstractColumnTupleWriter extends AbstractTupleWriterDisab */ public abstract void close(); + /** + * On abort, release all allocated temporary buffers + */ + public abstract void abort(); + /** * reset the state after flush */ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java index b46b67d097..a54a428201 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java @@ -54,4 +54,15 @@ public interface IColumnWriteMultiPageOp { * @return a {@link IBufferCache}-backed buffer for temporary use */ ByteBuffer confiscateTemporary() throws HyracksDataException; + + /** + * @return a {@link org.apache.hyracks.storage.common.buffercache.ColumnBufferPool} - backed first buffer for temporary use + */ + ByteBuffer confiscateTemporary0thBuffer(); + + /** + * release the acquired temporary buffer back to the {@link org.apache.hyracks.storage.common.buffercache.ColumnBufferPool} + * @param buffer + */ + void releaseTemporary0thBuffer(ByteBuffer buffer); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java index 65e19bbe20..ed8c4f16b7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java @@ -93,8 +93,9 @@ public class LSMColumnBTreeLocalResource extends LSMBTreeLocalResource { pageWriteCallbackFactory.initialize(serviceCtx, this); IDiskCacheMonitoringService diskCacheService = storageManager.getDiskCacheMonitoringService(serviceCtx); return LSMColumnBTreeUtil.createLSMTree(config, ioManager, vbcs, file, - storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, - bloomFilterFalsePositiveRate, mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), + storageManager.getBufferCache(serviceCtx), storageManager.getColumnBufferPool(serviceCtx), typeTraits, + cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, + mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx), opTrackerProvider.getOperationTracker(serviceCtx, this), ioSchedulerProvider.getIoScheduler(serviceCtx), ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, metadataPageManagerFactory, false, serviceCtx.getTracer(), compressorDecompressorFactory, nullTypeTraits, nullIntrospector, diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java index a863d13b9f..c791d00d28 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java @@ -36,14 +36,18 @@ import org.apache.hyracks.storage.common.IIndexBulkLoader; import org.apache.hyracks.storage.common.IIndexCursorStats; import org.apache.hyracks.storage.common.NoOpIndexCursorStats; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback; import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext; public class ColumnBTree extends DiskBTree { - public ColumnBTree(IBufferCache bufferCache, IPageManager freePageManager, + private final IColumnBufferPool columnBufferPool; + + public ColumnBTree(IBufferCache bufferCache, IColumnBufferPool columnBufferPool, IPageManager freePageManager, ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount, FileReference file) { super(bufferCache, freePageManager, interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file); + this.columnBufferPool = columnBufferPool; } @Override @@ -73,6 +77,10 @@ public class ColumnBTree extends DiskBTree { return new ColumnBTreeAccessor(this, iap, index, projectionInfo, context); } + public IColumnBufferPool getColumnBufferPool() { + return columnBufferPool; + } + public class ColumnBTreeAccessor extends DiskBTreeAccessor { private final int index; private final IColumnProjectionInfo projectionInfo; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java index 244cf47601..0769824425 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java @@ -38,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnW import org.apache.hyracks.storage.common.buffercache.CachedPage; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback; import org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext; import org.apache.hyracks.storage.common.file.BufferedFileHandle; @@ -58,8 +59,12 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I private final IColumnWriteContext columnWriteContext; private final int maxColumnsInPageZerothSegment; private final IColumnPageZeroWriter.ColumnPageZeroWriterType pageZeroWriterType; + private final IColumnBufferPool columnBufferPool; + private boolean writerInitialized = false; private boolean setLowKey; private int tupleCount; + // reservedBufferCount tracks the number of column buffers currently held by this bulkloader. + private int reservedBufferCount; // For logging private int numberOfLeafNodes; @@ -76,13 +81,14 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I IPageWriteCallback callback, ITreeIndex index, ITreeIndexFrame leafFrame, IBufferCacheWriteContext writeContext) throws HyracksDataException { super(fillFactor, verifyInput, callback, index, leafFrame, writeContext); + columnBufferPool = ((ColumnBTree) index).getColumnBufferPool(); columnsPages = new ArrayList<>(); pageZeroSegments = new ArrayList<>(); tempConfiscatedPages = new ArrayList<>(); columnWriteContext = (IColumnWriteContext) writeContext; columnarFrame = (ColumnBTreeWriteLeafFrame) leafFrame; columnWriter = columnarFrame.getColumnTupleWriter(); - columnWriter.init(this); + reservedBufferCount = 0; lowKey = new BTreeSplitKey(tupleWriter.createTupleReference()); lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount()); setLowKey = true; @@ -106,7 +112,10 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I @Override public void add(ITupleReference tuple) throws HyracksDataException { - if (isFull(tuple)) { + // track the number of columns in the current tuple + columnWriter.updateColumnMetadataForCurrentTuple(tuple); + ensureWritersInitialized(); + if (isLeafPageFull(tuple)) { writeFullLeafPage(); confiscateNewLeafPage(); } @@ -120,33 +129,124 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I tupleCount++; } + /** + * Ensures that the column writers are initialized and the correct number of column buffers are reserved. + * This method is called before any tuples are added. + */ + private void ensureWritersInitialized() throws HyracksDataException { + if (!writerInitialized) { + writerInitialized = true; + int numberOfColumns = columnWriter.getAbsoluteNumberOfColumns(true); + reserveBuffers(numberOfColumns); + reservedBufferCount = numberOfColumns; + columnWriter.init(this); + } + } + + /** + * Reserves the specified number of column buffers from the columnBufferPool. + * If reservation fails, releases any already held buffers and throws an exception. + */ + private void reserveBuffers(int numBuffers) throws HyracksDataException { + try { + columnBufferPool.reserve(numBuffers); + } catch (Exception e) { + // Release any already held buffers before throwing + releaseBuffers(); + throw HyracksDataException.create(e); + } + } + + private void releaseBuffers() { + if (reservedBufferCount > 0) { + columnBufferPool.release(reservedBufferCount); + reservedBufferCount = 0; + } + } + @Override protected ITreeIndexTupleReference createTupleReference() { return tupleWriter.createTupleReference(); } - private boolean isFull(ITupleReference tuple) throws HyracksDataException { + /** + * Determines if the current leaf page is full and needs to be flushed. + * @param tuple The tuple to be added. + * @return true if the page is full, false otherwise. + */ + private boolean isLeafPageFull(ITupleReference tuple) throws HyracksDataException { if (tupleCount == 0) { - columnWriter.updateColumnMetadataForCurrentTuple(tuple); - // this is for non-adaptive case. - columnWriter.setWriterType(pageZeroWriterType); - return false; - } else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) { - //We reached the maximum number of tuples + return handleFirstTuple(); + } + if (tupleCount >= columnWriter.getMaxNumberOfTuples()) { + // Maximum tuple count reached for this leaf page. return true; } - //Columns' Offsets - columnWriter.updateColumnMetadataForCurrentTuple(tuple); - int requiredFreeSpace = columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, + if (canCurrentTupleFit(tuple)) { + int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true); + return !ensureSufficientBuffers(requiredColumns); + } + // Not enough space for the tuple. + return true; + } + + /** + * Ensures that we have enough column buffers for the required number of columns. + * Attempts to reserve more buffers if needed. + * @param requiredColumns The number of columns needed for the current tuple. + * @return true if sufficient buffers are available or successfully reserved, false otherwise. + */ + private boolean ensureSufficientBuffers(int requiredColumns) throws HyracksDataException { + if (reservedBufferCount < requiredColumns) { + int additionalBuffersNeeded = requiredColumns - reservedBufferCount; + if (additionalBuffersNeeded > 0) { + // Try to reserve additional buffers + boolean reserved = columnBufferPool.tryReserve(additionalBuffersNeeded); + if (reserved) { + reservedBufferCount += additionalBuffersNeeded; + return true; + } + // Not enough buffers available, do not update reservedBufferCount + return false; + } + // If additionalBuffersNeeded == 0, we already have enough + return true; + } + return true; + } + + /** + * Checks if the current tuple can fit in the current leaf page based on space requirements. + * @param tuple The tuple to be added. + * @return true if the tuple can fit, false otherwise. + */ + private boolean canCurrentTupleFit(ITupleReference tuple) { + int requiredSpace = columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment, columnarFrame.getBuffer().capacity(), true, pageZeroWriterType); - //Occupied space from previous writes - requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize(); - //min and max tuples' sizes - requiredFreeSpace += lowKey.getTuple().getTupleSize() + getSplitKeySize(tuple); - //New tuple required space - requiredFreeSpace += columnWriter.bytesRequired(tuple); - lastRequiredFreeSpace = requiredFreeSpace; - return bufferCache.getPageSize() <= requiredFreeSpace; + requiredSpace += columnWriter.getPrimaryKeysEstimatedSize(); + requiredSpace += lowKey.getTuple().getTupleSize() + getSplitKeySize(tuple); + requiredSpace += columnWriter.bytesRequired(tuple); + lastRequiredFreeSpace = requiredSpace; + + // If the page has enough space, check if we have enough buffers. + // If not, we need to flush to avoid deadlock on column buffers. + return bufferCache.getPageSize() > requiredSpace; + } + + /** + * Handles setup for the very first tuple in the leaf page. + * Reserves the required number of column buffers and sets the writer type. + * @return false, as the first tuple always fits after setup. + */ + private boolean handleFirstTuple() throws HyracksDataException { + int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true); + int additionalBuffersNeeded = requiredColumns - reservedBufferCount; + if (additionalBuffersNeeded > 0) { + reserveBuffers(additionalBuffersNeeded); + reservedBufferCount += additionalBuffersNeeded; + } + columnWriter.setWriterType(pageZeroWriterType); + return false; } private void setMinMaxKeys(ITupleReference tuple) { @@ -160,80 +260,91 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I @Override public void end() throws HyracksDataException { - if (tupleCount > 0) { - splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0); - try { - columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(), - splitKey.getTuple(), this); - updatePageLayoutStat(); - } catch (Exception e) { - logState(e); - throw e; + try { + ensureWritersInitialized(); + if (tupleCount > 0) { + splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0); + try { + columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(), + splitKey.getTuple(), this); + updatePageLayoutStat(); + } catch (Exception e) { + logState(e); + throw e; + } + } + columnWriter.close(); + //We are done, return any temporary confiscated pages + for (ICachedPage page : tempConfiscatedPages) { + bufferCache.returnPage(page, false); } - } - columnWriter.close(); - //We are done, return any temporary confiscated pages - for (ICachedPage page : tempConfiscatedPages) { - bufferCache.returnPage(page, false); - } - // For logging - int numberOfTempConfiscatedPages = tempConfiscatedPages.size(); - tempConfiscatedPages.clear(); - //Where Page0 and columns pages will be written - super.end(); + // For logging + int numberOfTempConfiscatedPages = tempConfiscatedPages.size(); + tempConfiscatedPages.clear(); + //Where Page0 and columns pages will be written + super.end(); - log("Finished", numberOfTempConfiscatedPages); + log("Finished", numberOfTempConfiscatedPages); + } finally { + // Release all column buffers held by this bulkloader + releaseBuffers(); + } } @Override protected void writeFullLeafPage() throws HyracksDataException { - NodeFrontier leafFrontier = nodeFrontiers.get(0); - splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0); - splitKey.setLeftPage(leafFrontier.pageId); - if (tupleCount > 0) { - //We need to flush columns to confiscate all columns pages first before calling propagateBulk - try { - columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(), - splitKey.getTuple(), this); - updatePageLayoutStat(); - } catch (Exception e) { - logState(e); - throw e; + try { + NodeFrontier leafFrontier = nodeFrontiers.get(0); + splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0); + splitKey.setLeftPage(leafFrontier.pageId); + if (tupleCount > 0) { + //We need to flush columns to confiscate all columns pages first before calling propagateBulk + try { + columnarFrame.flush(columnWriter, tupleCount, maxColumnsInPageZerothSegment, lowKey.getTuple(), + splitKey.getTuple(), this); + updatePageLayoutStat(); + } catch (Exception e) { + logState(e); + throw e; + } } - } - - propagateBulk(1, pagesToWrite); - //Take a page for the next leaf - leafFrontier.pageId = freePageManager.takePage(metaFrame); - columnarFrame.setNextLeaf(leafFrontier.pageId); + propagateBulk(1, pagesToWrite); - /* - * Write columns' pages first to ensure they (columns' pages) are written before pageZero. - * It ensures pageZero does not land in between columns' pages if compression is enabled - */ - writeColumnAndSegmentPages(); - //Then write page0 - write(leafFrontier.page); + //Take a page for the next leaf + leafFrontier.pageId = freePageManager.takePage(metaFrame); + columnarFrame.setNextLeaf(leafFrontier.pageId); - //Write interior nodes after writing columns pages - for (ICachedPage c : pagesToWrite) { - write(c); - } + /* + * Write columns' pages first to ensure they (columns' pages) are written before pageZero. + * It ensures pageZero does not land in between columns' pages if compression is enabled + */ + writeColumnAndSegmentPages(); + //Then write page0 + write(leafFrontier.page); - // For logging - maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode); - maxTupleCount = Math.max(maxTupleCount, tupleCount); - // Starts with 1 for page0 - numberOfPagesInCurrentLeafNode = 1; - numberOfLeafNodes++; + //Write interior nodes after writing columns pages + for (ICachedPage c : pagesToWrite) { + write(c); + } - // Clear for next page - pagesToWrite.clear(); - splitKey.setRightPage(leafFrontier.pageId); - setLowKey = true; - tupleCount = 0; + // For logging + maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode); + maxTupleCount = Math.max(maxTupleCount, tupleCount); + // Starts with 1 for page0 + numberOfPagesInCurrentLeafNode = 1; + numberOfLeafNodes++; + + // Clear for next page + pagesToWrite.clear(); + splitKey.setRightPage(leafFrontier.pageId); + setLowKey = true; + tupleCount = 0; + } finally { + // Release all column buffers held by this bulkloader for this leaf + releaseBuffers(); + } } @Override @@ -295,21 +406,29 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I @Override public void abort() throws HyracksDataException { - for (ICachedPage page : columnsPages) { - bufferCache.returnPage(page, false); - } + try { + for (ICachedPage page : columnsPages) { + bufferCache.returnPage(page, false); + } - for (ICachedPage page : pageZeroSegments) { - bufferCache.returnPage(page, false); - } + for (ICachedPage page : pageZeroSegments) { + bufferCache.returnPage(page, false); + } - for (ICachedPage page : tempConfiscatedPages) { - bufferCache.returnPage(page, false); - } - super.abort(); + for (ICachedPage page : tempConfiscatedPages) { + bufferCache.returnPage(page, false); + } + super.abort(); + // Release the acquired column buffer back to the pool + columnWriter.abort(); - // For logging - log("Aborted", tempConfiscatedPages.size()); + // For logging + log("Aborted", tempConfiscatedPages.size()); + } finally { + + // Release all column buffers held by this bulkloader + releaseBuffers(); + } } private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) { @@ -375,6 +494,16 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I return page.getBuffer(); } + @Override + public ByteBuffer confiscateTemporary0thBuffer() { + return columnBufferPool.getBuffer(); + } + + @Override + public void releaseTemporary0thBuffer(ByteBuffer buffer) { + columnBufferPool.recycle(buffer); + } + private void logState(Exception e) { try { ObjectNode state = JSONUtil.createObject(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java index 1b9e198260..09a79a8d20 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java @@ -26,18 +26,22 @@ import org.apache.hyracks.storage.am.common.api.IPageManagerFactory; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; public class ColumnBTreeFactory extends TreeIndexFactory<ColumnBTree> { - public ColumnBTreeFactory(IIOManager ioManager, IBufferCache bufferCache, + private final IColumnBufferPool columnBufferPool; + + public ColumnBTreeFactory(IIOManager ioManager, IBufferCache bufferCache, IColumnBufferPool columnBufferPool, IPageManagerFactory freePageManagerFactory, ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory, IBinaryComparatorFactory[] cmpFactories, int fieldCount) { super(ioManager, bufferCache, freePageManagerFactory, interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount); + this.columnBufferPool = columnBufferPool; } @Override public ColumnBTree createIndexInstance(FileReference file) throws HyracksDataException { - return new ColumnBTree(bufferCache, freePageManagerFactory.createPageManager(bufferCache), interiorFrameFactory, - leafFrameFactory, cmpFactories, fieldCount, file); + return new ColumnBTree(bufferCache, columnBufferPool, freePageManagerFactory.createPageManager(bufferCache), + interiorFrameFactory, leafFrameFactory, cmpFactories, fieldCount, file); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java index 6050749443..022cb38b6a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java @@ -49,6 +49,7 @@ import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICurs import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IIndexCursorStats; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.util.trace.ITracer; public class LSMColumnBTree extends LSMBTree { @@ -56,6 +57,7 @@ public class LSMColumnBTree extends LSMBTree { private final IColumnManager columnManager; private final IColumnIndexDiskCacheManager diskCacheManager; private final ILSMDiskComponentFactory mergeComponentFactory; + private final IColumnBufferPool columnBufferPool; /** * This column metadata only used during flush and dataset bulkload operations. We cannot have more than one * thread to do a flush/dataset bulkload. Do not use it for search/scan. Instead, use the latest component @@ -68,13 +70,14 @@ public class LSMColumnBTree extends LSMBTree { public LSMColumnBTree(NCConfig storageConfig, IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches, ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory, IBufferCache diskBufferCache, - ILSMIndexFileManager fileManager, ILSMDiskComponentFactory componentFactory, - ILSMDiskComponentFactory mergeComponentFactory, ILSMDiskComponentFactory bulkloadComponentFactory, - double bloomFilterFalsePositiveRate, int fieldCount, IBinaryComparatorFactory[] cmpFactories, - ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, - ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory, - int[] btreeFields, ITracer tracer, IColumnManager columnManager, boolean atomic, - IColumnIndexDiskCacheManager diskCacheManager) throws HyracksDataException { + IColumnBufferPool columnBufferPool, ILSMIndexFileManager fileManager, + ILSMDiskComponentFactory componentFactory, ILSMDiskComponentFactory mergeComponentFactory, + ILSMDiskComponentFactory bulkloadComponentFactory, double bloomFilterFalsePositiveRate, int fieldCount, + IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, + ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, + ILSMPageWriteCallbackFactory pageWriteCallbackFactory, int[] btreeFields, ITracer tracer, + IColumnManager columnManager, boolean atomic, IColumnIndexDiskCacheManager diskCacheManager) + throws HyracksDataException { super(storageConfig, ioManager, virtualBufferCaches, interiorFrameFactory, insertLeafFrameFactory, deleteLeafFrameFactory, diskBufferCache, fileManager, componentFactory, bulkloadComponentFactory, null, null, null, bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy, opTracker, ioScheduler, @@ -83,6 +86,7 @@ public class LSMColumnBTree extends LSMBTree { this.columnManager = columnManager; this.mergeComponentFactory = mergeComponentFactory; this.diskCacheManager = diskCacheManager; + this.columnBufferPool = columnBufferPool; } @Override @@ -146,6 +150,10 @@ public class LSMColumnBTree extends LSMBTree { return CURSOR_FACTORY; } + public IColumnBufferPool getColumnBufferPool() { + return columnBufferPool; + } + @Override public IColumnIndexDiskCacheManager getDiskCacheManager() { return diskCacheManager; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java index c8815ae37b..a41561024e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java @@ -57,6 +57,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.util.trace.ITracer; @@ -64,11 +65,11 @@ public class LSMColumnBTreeUtil { public static LSMBTree createLSMTree(NCConfig storageConfig, IIOManager ioManager, List<IVirtualBufferCache> virtualBufferCaches, FileReference file, IBufferCache diskBufferCache, - ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, int[] bloomFilterKeyFields, - double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, - ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallbackFactory ioOpCallbackFactory, - ILSMPageWriteCallbackFactory pageWriteCallbackFactory, int[] btreeFields, - IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer, + IColumnBufferPool columnBufferPool, ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, + int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, + ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler, + ILSMIOOperationCallbackFactory ioOpCallbackFactory, ILSMPageWriteCallbackFactory pageWriteCallbackFactory, + int[] btreeFields, IMetadataPageManagerFactory freePageManagerFactory, boolean updateAware, ITracer tracer, ICompressorDecompressorFactory compressorDecompressorFactory, ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector, IColumnManagerFactory columnManagerFactory, boolean atomic, IDiskCacheMonitoringService diskCacheService) throws HyracksDataException { @@ -104,13 +105,15 @@ public class LSMColumnBTreeUtil { ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(insertTupleWriterFactory); // BTree factory - TreeIndexFactory<ColumnBTree> flushBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache, - freePageManagerFactory, interiorFrameFactory, flushLeafFrameFactory, cmpFactories, typeTraits.length); - TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new ColumnBTreeFactory(ioManager, diskBufferCache, - freePageManagerFactory, interiorFrameFactory, mergeLeafFrameFactory, cmpFactories, typeTraits.length); + TreeIndexFactory<ColumnBTree> flushBTreeFactory = + new ColumnBTreeFactory(ioManager, diskBufferCache, columnBufferPool, freePageManagerFactory, + interiorFrameFactory, flushLeafFrameFactory, cmpFactories, typeTraits.length); + TreeIndexFactory<ColumnBTree> mergeBTreeFactory = + new ColumnBTreeFactory(ioManager, diskBufferCache, columnBufferPool, freePageManagerFactory, + interiorFrameFactory, mergeLeafFrameFactory, cmpFactories, typeTraits.length); TreeIndexFactory<ColumnBTree> bulkloadBTreeFactory = - new ColumnBTreeFactory(ioManager, diskBufferCache, freePageManagerFactory, interiorFrameFactory, - bulkLoadLeafFrameFactory, cmpFactories, typeTraits.length); + new ColumnBTreeFactory(ioManager, diskBufferCache, columnBufferPool, freePageManagerFactory, + interiorFrameFactory, bulkLoadLeafFrameFactory, cmpFactories, typeTraits.length); ILSMIndexFileManager fileNameManager = new LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true, compressorDecompressorFactory, diskCacheEnabled); @@ -124,9 +127,9 @@ public class LSMColumnBTreeUtil { new LSMColumnBTreeWithBloomFilterDiskComponentFactory(bulkloadBTreeFactory, bloomFilterFactory); return new LSMColumnBTree(storageConfig, ioManager, virtualBufferCaches, interiorFrameFactory, - insertLeafFrameFactory, deleteLeafFrameFactory, diskBufferCache, fileNameManager, flushComponentFactory, - mergeComponentFactory, bulkLoadComponentFactory, bloomFilterFalsePositiveRate, typeTraits.length, - cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, pageWriteCallbackFactory, - btreeFields, tracer, columnManager, atomic, diskCacheManager); + insertLeafFrameFactory, deleteLeafFrameFactory, diskBufferCache, columnBufferPool, fileNameManager, + flushComponentFactory, mergeComponentFactory, bulkLoadComponentFactory, bloomFilterFalsePositiveRate, + typeTraits.length, cmpFactories, mergePolicy, opTracker, ioScheduler, ioOpCallbackFactory, + pageWriteCallbackFactory, btreeFields, tracer, columnManager, atomic, diskCacheManager); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java index 47492e8dab..d9557e661b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.io.IJsonSerializable; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.storage.common.file.IResourceIdFactory; @@ -43,6 +44,11 @@ public interface IStorageManager extends Serializable, IJsonSerializable { */ IBufferCache getBufferCache(INCServiceContext ctx); + /** + * @return the column buffer pool {@link IColumnBufferPool} + */ + IColumnBufferPool getColumnBufferPool(INCServiceContext ctx); + /** * @param ctx the nc service context * @return the local resource repository {@link ILocalResourceRepository} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java new file mode 100644 index 0000000000..5a15f12978 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * ColumnBufferPool: a semaphore-based buffer pool for columnar buffer management. + * + * NOTE: getBuffer() and recycle() are called very frequently and are designed to be as lightweight as possible. + */ +public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent { + protected static final Logger LOGGER = LogManager.getLogger(); + + private final BlockingQueue<ByteBuffer> bufferPool; + private final AtomicLong totalAllocatedMemoryInBytes; + private final AtomicLong totalPooledMemoryInBytes; + private final AtomicLong numAllocNew; + + // Semaphore for buffer-based allocation + private final Semaphore bufferSemaphore; + private final long maxBufferPoolMemoryLimit; + private final double columnBufferPoolMemoryPercentage; + private final int columnBufferInBytes; + private final int columnBufferPoolMaxSize; + private final int maxBuffers; + + // Timeout for buffer reservation in milliseconds + private final long reserveTimeoutMillis; + + /** + * @param columnBufferInBytes buffer size in bytes + * @param columnBufferPoolMaxSize max number of buffers in pool + * @param columnBufferPoolMemoryPercentage max percentage of total memory for pool + * @param reserveTimeoutMillis timeout in milliseconds for buffer reservation + */ + public ColumnBufferPool(int columnBufferInBytes, int columnBufferPoolMaxSize, + double columnBufferPoolMemoryPercentage, long reserveTimeoutMillis) { + this.totalAllocatedMemoryInBytes = new AtomicLong(0); + this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolMaxSize); + this.columnBufferPoolMemoryPercentage = columnBufferPoolMemoryPercentage; + this.columnBufferPoolMaxSize = columnBufferPoolMaxSize; + this.reserveTimeoutMillis = reserveTimeoutMillis; + this.maxBufferPoolMemoryLimit = getMaxBufferPoolMemoryLimit(columnBufferInBytes, + columnBufferPoolMemoryPercentage, columnBufferPoolMaxSize); + this.maxBuffers = (int) (maxBufferPoolMemoryLimit / columnBufferInBytes); + this.bufferSemaphore = new Semaphore(maxBuffers, true); + this.columnBufferInBytes = columnBufferInBytes; + this.totalPooledMemoryInBytes = new AtomicLong(0); + this.numAllocNew = new AtomicLong(0); + initializePool(); + LOGGER.info( + "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferInBytes={}, reserveTimeoutMillis={}", + columnBufferPoolMaxSize, maxBufferPoolMemoryLimit, maxBuffers, columnBufferInBytes, + reserveTimeoutMillis); + } + + /** + * Pre-allocate buffers for fast buffer access. + * The number of buffers pre-allocated is the minimum of half the pool size and the memory limit. + */ + private void initializePool() { + int halfPoolSize = columnBufferPoolMaxSize / 2; + int numBuffersToAllocate = Math.min(halfPoolSize, maxBuffers); + + for (int i = 0; i < numBuffersToAllocate; i++) { + ByteBuffer buffer = ByteBuffer.allocate(columnBufferInBytes); + bufferPool.add(buffer); + totalPooledMemoryInBytes.addAndGet(columnBufferInBytes); + } + LOGGER.info("ColumnBufferPool pre-allocated {} buffers ({} bytes each)", numBuffersToAllocate, + columnBufferInBytes); + } + + /** + * Reserve the specified number of buffers, blocking up to the configured timeout if necessary. + * @param requestedBuffers number of buffers to reserve + * @throws InterruptedException if interrupted while waiting + * @throws IllegalStateException if unable to reserve within the timeout + */ + @Override + public void reserve(int requestedBuffers) throws InterruptedException { + if (requestedBuffers <= 0) { + return; + } + boolean acquired = bufferSemaphore.tryAcquire(requestedBuffers, reserveTimeoutMillis, TimeUnit.MILLISECONDS); + if (!acquired) { + LOGGER.error("Failed to reserve {} buffers within {} ms ({} sec)", requestedBuffers, reserveTimeoutMillis, + TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis)); + throw new IllegalStateException("Timeout while reserving column buffers (" + requestedBuffers + ") after " + + reserveTimeoutMillis + " ms"); + } + } + + @Override + public boolean tryReserve(int requestedBuffers) { + if (requestedBuffers <= 0) { + return true; + } + return bufferSemaphore.tryAcquire(requestedBuffers); + } + + @Override + public void release(int buffers) { + if (buffers <= 0) { + return; + } + bufferSemaphore.release(buffers); + } + + /** + * Fast path for buffer acquisition. Called very frequently. + */ + @Override + public ByteBuffer getBuffer() { + // Fast path: try to poll from pool + ByteBuffer buffer = bufferPool.poll(); + if (buffer != null) { + totalPooledMemoryInBytes.addAndGet(-columnBufferInBytes); + buffer.clear(); + return buffer; + } + + // Slow path: allocate new buffer if quota allows + ensureAvailableQuota(); + + numAllocNew.incrementAndGet(); + buffer = ByteBuffer.allocate(columnBufferInBytes); + totalAllocatedMemoryInBytes.addAndGet(columnBufferInBytes); + return buffer; + } + + /** + * Fast path for buffer recycling. Called very frequently. + */ + @Override + public void recycle(ByteBuffer buffer) { + if (buffer == null) { + throw new IllegalStateException("buffer is null"); + } + + // Try to return to pool; if full, discard + if (bufferPool.offer(buffer)) { + totalPooledMemoryInBytes.addAndGet(columnBufferInBytes); + } else { + totalAllocatedMemoryInBytes.addAndGet(-columnBufferInBytes); + } + } + + @Override + public int getMaxReservedBuffers() { + return maxBuffers; + } + + /** + * Ensures that the total allocated memory does not exceed the maxBufferPoolMemoryLimit limit. + * Throws if the limit would be exceeded. + */ + private void ensureAvailableQuota() { + long spaceAcquiredByPool = (long) columnBufferPoolMaxSize * columnBufferInBytes; + long totalAllocated = totalAllocatedMemoryInBytes.get(); + long totalIfAllocated = totalAllocated + spaceAcquiredByPool; + if (totalIfAllocated > maxBufferPoolMemoryLimit) { + LOGGER.error( + "Cannot allocate more buffers, memory limit reached. " + + "totalAllocatedMemoryInBytes={}, maxBufferPoolMemoryLimit={}, columnBufferPoolMaxSize={}, columnBufferInBytes={}, " + + "columnBufferPoolMemoryPercentage={}, totalIfAllocated={}, numAllocNew={}, availableBuffers={}. " + + "Consider increasing storageColumnBufferPoolMemoryPercentage (current: {}).", + totalAllocated, maxBufferPoolMemoryLimit, columnBufferPoolMaxSize, columnBufferInBytes, + columnBufferPoolMemoryPercentage, totalIfAllocated, numAllocNew.get(), + bufferSemaphore.availablePermits(), columnBufferPoolMemoryPercentage); + throw new IllegalStateException("Cannot allocate more buffers, maxBufferPoolMemoryLimit (" + + maxBufferPoolMemoryLimit + ") reached."); + } + } + + private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, double columnBufferPoolMemoryPercentage, + int columnBufferPoolMaxSize) { + long totalMemory = Runtime.getRuntime().totalMemory(); + return (long) Math.max(totalMemory * (columnBufferPoolMemoryPercentage / 100), + columnBufferInBytes * columnBufferPoolMaxSize); + } + + @Override + public void close() { + bufferPool.clear(); + totalPooledMemoryInBytes.set(0); + totalAllocatedMemoryInBytes.set(0); + LOGGER.info("ColumnBufferPool closed. numAllocNew={}", numAllocNew.get()); + } + + @Override + public void start() { + // No-op + } + + @Override + public void dumpState(OutputStream os) throws IOException { + long pooledBytes = totalPooledMemoryInBytes.get(); + long totalAllocatedBytes = totalAllocatedMemoryInBytes.get(); + String buffer = "ColumnBufferPool State:\n" + "columnBufferPoolMaxSize: " + columnBufferPoolMaxSize + "\n" + + "Max Buffers: " + maxBuffers + "\n" + "Total Pooled Memory (bytes): " + pooledBytes + "\n" + + "Total Allocated Memory (bytes): " + totalAllocatedBytes + "\n" + "Number of Buffers Allocated New: " + + numAllocNew.get() + "\n" + "Available Buffers: " + bufferSemaphore.availablePermits(); + os.write(buffer.getBytes()); + LOGGER.info("dumpState called:\n{}", buffer); + } + + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + close(); + if (dumpState && ouputStream != null) { + dumpState(ouputStream); + } + } +} diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java similarity index 55% copy from asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java copy to hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java index aa3cb71aaf..e47478f4b0 100644 --- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java @@ -16,41 +16,47 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.column.common.buffer; +package org.apache.hyracks.storage.common.buffercache; import java.nio.ByteBuffer; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; +import org.apache.hyracks.util.StorageUtil; -public class NoOpWriteMultiPageOp implements IColumnWriteMultiPageOp { - public static final IColumnWriteMultiPageOp INSTANCE = new NoOpWriteMultiPageOp(); +public class FreeColumnBufferPool implements IColumnBufferPool { + private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE); + + @Override + public void reserve(int requestedBuffers) throws InterruptedException { - private NoOpWriteMultiPageOp() { } @Override - public ByteBuffer confiscatePersistent() throws HyracksDataException { - return null; + public boolean tryReserve(int requestedBuffers) { + return true; } @Override - public ByteBuffer confiscatePageZeroPersistent() throws HyracksDataException { - return null; + public void release(int buffers) { + } @Override - public ByteBuffer confiscateTemporary() throws HyracksDataException { - return null; + public ByteBuffer getBuffer() { + return ByteBuffer.allocate(INITIAL_BUFFER_SIZE); } @Override - public void persist() throws HyracksDataException { + public int getMaxReservedBuffers() { + return 0; + } + + @Override + public void recycle(ByteBuffer buffer) { } @Override - public int getNumberOfPersistentBuffers() { - return 0; + public void close() { + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java new file mode 100644 index 0000000000..9fc3027799 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common.buffercache; + +import java.nio.ByteBuffer; + +/** + * Interface for a buffer-based pool managing column buffer allocation. + * + * The contract is: + * - Callers must reserve semaphore permits (blocking or non-blocking) before acquiring buffers. + * - Semaphore permits must be released when no longer needed. + * - Buffers are acquired via getBuffer() and returned via recycle(). + * - The pool may block or fail to allocate if semaphore permits are exhausted. + */ +public interface IColumnBufferPool { + + /** + * Reserve the specified number of buffers, blocking if necessary until available. + * @param requestedBuffers number of buffers to reserve + * @throws InterruptedException if interrupted while waiting + */ + void reserve(int requestedBuffers) throws InterruptedException; + + /** + * Attempt to reserve the specified number of buffers without blocking. + * @param requestedBuffers number of buffers to reserve + * @return true if buffers were reserved, false otherwise + */ + boolean tryReserve(int requestedBuffers); + + /** + * Release the specified number of previously reserved buffers. + * @param buffers number of buffers to release + */ + void release(int buffers); + + /** + * Acquire a buffer from the pool. Requires a reserved buffer. + * @return a ByteBuffer for use + */ + ByteBuffer getBuffer(); + + /** + * Return a buffer to the pool for reuse. + * @param buffer the buffer to recycle + */ + void recycle(ByteBuffer buffer); + + /** + * Returns the maximum number of buffers that can be reserved from this pool. + * This represents the upper bound on concurrent buffer allocations. + * + * @return the maximum number of buffers available in the pool + */ + int getMaxReservedBuffers(); + + /** + * Close the pool and release any resources. + */ + void close(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java index 0b4d2ed72b..ead6dd2d0c 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java @@ -26,6 +26,7 @@ import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.IResourceLifecycleManager; import org.apache.hyracks.storage.common.IStorageManager; import org.apache.hyracks.storage.common.buffercache.IBufferCache; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService; import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService; import org.apache.hyracks.storage.common.file.ResourceIdFactory; @@ -47,6 +48,11 @@ public class TestStorageManager implements IStorageManager { return TestStorageManagerComponentHolder.getBufferCache(ctx); } + @Override + public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) { + return TestStorageManagerComponentHolder.getColumnBufferPool(ctx); + } + @Override public ILocalResourceRepository getLocalResourceRepository(INCServiceContext ctx) { return TestStorageManagerComponentHolder.getLocalResourceRepository(); diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java index 7f696b4e95..5788823a94 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -39,11 +40,13 @@ import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.IResourceLifecycleManager; import org.apache.hyracks.storage.common.buffercache.BufferCache; import org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy; +import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator; import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy; import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator; +import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool; import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy; import org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider; import org.apache.hyracks.storage.common.file.FileMapManager; @@ -56,6 +59,7 @@ import org.apache.hyracks.storage.common.file.TransientLocalResourceRepositoryFa public class TestStorageManagerComponentHolder { private static IBufferCache bufferCache; + private static IColumnBufferPool columnBufferPool; private static IFileMapProvider fileMapProvider; private static IOManager ioManager; private static ILocalResourceRepository localResourceRepository; @@ -76,6 +80,7 @@ public class TestStorageManagerComponentHolder { fileMapProvider = null; localResourceRepository = null; lcManager = null; + columnBufferPool = null; } public synchronized static IResourceLifecycleManager<IIndex> getIndexLifecycleManager() { @@ -93,6 +98,13 @@ public class TestStorageManagerComponentHolder { return bufferCache; } + public synchronized static IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) { + if (columnBufferPool == null) { + return getColumnBufferPool(); + } + return columnBufferPool; + } + private synchronized static IFileMapProvider getFileMapProvider() { if (fileMapProvider == null) { fileMapProvider = new FileMapManager(); @@ -142,6 +154,15 @@ public class TestStorageManagerComponentHolder { return resourceIdFactory; } + public static IColumnBufferPool getColumnBufferPool() { + if (columnBufferPool != null) { + return columnBufferPool; + } + + columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, TimeUnit.MINUTES.toMillis(2)); + return columnBufferPool; + } + public static IBufferCache getBufferCache(IIOManager ioManager) { if (bufferCache != null) { return bufferCache; diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java new file mode 100644 index 0000000000..0428b48e06 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ColumnBufferPoolTest { + + private ColumnBufferPool columnBufferPool; + private static final int COLUMN_BUFFER_GRANULE_BYTES = 4096; // 4KB + private static final int POOL_SIZE = 10; + private static final double CAPPED_PERCENTAGE = 10.0; // 10% of runtime memory + private static final long RESERVE_TIMEOUT_MILLIS = 5000; // 5 seconds timeout + + @Before + public void setUp() { + columnBufferPool = + new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, RESERVE_TIMEOUT_MILLIS); + } + + @After + public void tearDown() { + if (columnBufferPool != null) { + columnBufferPool.close(); + } + } + + @Test + public void testBasicCreditReservationAndRelease() throws InterruptedException { + // Reserve credits + columnBufferPool.reserve(5); + // Release credits + columnBufferPool.release(5); + } + + @Test + public void testReserveExtraCreditsFor0thTuple() throws InterruptedException { + // Simulate scenario: initially reserve 3 credits, then need 8 total for 0th tuple + columnBufferPool.reserve(3); + // Need 5 more credits for 0th tuple (blocking call) + columnBufferPool.reserve(5); // This should block until credits are available + // Release all credits + columnBufferPool.release(8); + } + + @Test + public void testTryAcquiringExtraCreditsForNon0thTuple() throws InterruptedException { + // Simulate scenario: initially have 2 credits, try to get 5 total (non-blocking) + columnBufferPool.reserve(2); + // Try to acquire 3 more credits for non-0th tuple (should succeed if available) + boolean success = columnBufferPool.tryReserve(3); + assertTrue("Should be able to reserve extra credits", success); + // Release all credits + columnBufferPool.release(5); + } + + @Test + public void testTryReserveFailure() throws InterruptedException { + // Reserve most credits + int maxCredits = columnBufferPool.getMaxReservedBuffers(); + columnBufferPool.reserve(maxCredits - 2); + // Try to reserve more than available + boolean success = columnBufferPool.tryReserve(5); + assertFalse("Should fail to reserve more credits than available", success); + // Release credits + columnBufferPool.release(maxCredits - 2); + } + + @Test(expected = IllegalStateException.class, timeout = 10000) + public void testReserveTimeout() throws InterruptedException { + // Use a shorter timeout for this test + ColumnBufferPool shortTimeoutPool = + new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, 100); // 100ms timeout + try { + // Reserve all credits + int maxCredits = shortTimeoutPool.getMaxReservedBuffers(); + shortTimeoutPool.reserve(maxCredits); + // This should timeout and throw IllegalStateException + shortTimeoutPool.reserve(1); + } finally { + shortTimeoutPool.close(); + } + } + + @Test + public void testBufferAcquisitionAndRelease() throws InterruptedException { + // Reserve credits for buffers + columnBufferPool.reserve(3); + List<ByteBuffer> acquiredBuffers = new ArrayList<>(); + // Acquire buffers + for (int i = 0; i < 3; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull("Buffer should not be null", buffer); + assertEquals("Buffer should have correct capacity", COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity()); + acquiredBuffers.add(buffer); + } + // Release buffers + for (ByteBuffer buffer : acquiredBuffers) { + columnBufferPool.recycle(buffer); + } + // Release credits + columnBufferPool.release(3); + } + + @Test + public void testBufferAcquisitionAndReleaseInBatches() throws InterruptedException { + // Step 1: Reserve credits upfront + columnBufferPool.reserve(6); + + List<ByteBuffer> acquiredBuffers = new ArrayList<>(); + + // Step 2: Acquire buffers for first batch + for (int i = 0; i < 3; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull(buffer); + assertEquals(COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity()); + acquiredBuffers.add(buffer); + } + + // Step 3: Release buffers after first batch (flush happens) + for (ByteBuffer buffer : acquiredBuffers) { + columnBufferPool.recycle(buffer); + } + acquiredBuffers.clear(); + + // Step 4: Acquire buffers for second batch + for (int i = 0; i < 2; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull(buffer); + acquiredBuffers.add(buffer); + } + + // Step 5: Release buffers after second batch + for (ByteBuffer buffer : acquiredBuffers) { + columnBufferPool.recycle(buffer); + } + + // Step 6: Release remaining credits + columnBufferPool.release(6); + } + + @Test + public void testCompleteThreadLifecycle() throws InterruptedException { + // Step 1: Reserve initial credits + columnBufferPool.reserve(4); + + // Step 2: Reserve extra credits for 0th tuple + columnBufferPool.reserve(3); // Now have 7 total + + // Step 3: Process multiple batches + for (int batch = 0; batch < 3; batch++) { + List<ByteBuffer> batchBuffers = new ArrayList<>(); + + // Acquire buffers for this batch + for (int i = 0; i < 2; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull(buffer); + batchBuffers.add(buffer); + } + + // Simulate processing + Thread.sleep(10); + + // Release buffers after batch processing + for (ByteBuffer buffer : batchBuffers) { + columnBufferPool.recycle(buffer); + } + } + + // Step 4: Release remaining credits + columnBufferPool.release(7); + } + + @Test + public void testMultipleThreadsConcurrentExecution() throws InterruptedException { + // Use shorter timeout to make timeouts happen faster in test + ColumnBufferPool testPool = + new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, 500); // 500ms timeout + + // Helper for timestamped log + java.util.function.Consumer<String> logWithTimestamp = msg -> { + System.out.println("[" + java.time.Instant.now() + "] " + msg); + }; + + try { + int numThreads = 5; // More threads to increase contention + int maxCredits = testPool.getMaxReservedBuffers(); + logWithTimestamp.accept("maxCredits: " + maxCredits); + int creditsPerThread = (maxCredits * 3) / 4; // Each thread wants 75% of total credits - forces contention! + + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch completionLatch = new CountDownLatch(numThreads); + AtomicInteger successfulThreads = new AtomicInteger(0); + AtomicInteger threadOrder = new AtomicInteger(0); + AtomicInteger totalRetries = new AtomicInteger(0); + + for (int threadId = 0; threadId < numThreads; threadId++) { + final int id = threadId; + executor.submit(() -> { + try { + startLatch.await(); + int threadRetryCount = 0; + boolean threadSuccessful = false; + + // Thread-level retry: retry the entire thread operation + while (!threadSuccessful && threadRetryCount < 100) { + int totalReservedCredits = 0; // Track credits reserved in this attempt + try { + if (threadRetryCount > 0) { + totalRetries.incrementAndGet(); + logWithTimestamp + .accept("Thread " + id + " starting retry attempt #" + threadRetryCount); + // Exponential backoff with thread-specific variation before retrying + int backoffTime = 100 + (threadRetryCount * 50) + (id * 25); // Thread-specific backoff + Thread.sleep(backoffTime); + } + + int myOrder = threadOrder.incrementAndGet(); + + // Step 1: Reserve initial credits (no individual retry - fail fast) + int initialCredits = creditsPerThread / 2; + long startTime = System.currentTimeMillis(); + testPool.reserve(initialCredits); + totalReservedCredits += initialCredits; // Track successful reservation + long reserveTime = System.currentTimeMillis() - startTime; + + logWithTimestamp.accept("Thread " + id + " (order: " + myOrder + + ") reserved initial credits (" + initialCredits + " credits) in " + + reserveTime + "ms" + + (threadRetryCount > 0 ? " on thread retry #" + threadRetryCount : "")); + + // Step 2: Reserve extra credits for 0th tuple (no individual retry - fail fast) + int extraCredits = creditsPerThread / 2; + startTime = System.currentTimeMillis(); + testPool.reserve(extraCredits); // Now have creditsPerThread total + totalReservedCredits += extraCredits; // Track successful reservation + reserveTime = System.currentTimeMillis() - startTime; + + logWithTimestamp.accept("Thread " + id + " reserved extra credits (" + extraCredits + + " credits) in " + reserveTime + "ms"); + + // Step 3: Hold credits for longer than timeout to force other threads to timeout + // Only the first thread (or first few) will succeed initially + // Varied sleep patterns per thread to create different contention scenarios + int baseSleep = 600 + (id * 100); // Thread 0: 600ms, Thread 1: 700ms, etc. + Thread.sleep(baseSleep); + + // Step 4: Process some buffers while holding many credits + List<ByteBuffer> buffers = new ArrayList<>(); + for (int i = 0; i < Math.min(3, creditsPerThread); i++) { + ByteBuffer buffer = testPool.getBuffer(); + buffers.add(buffer); + } + + // Simulate longer processing to ensure timeouts occur + // Different processing times per thread to create realistic workload patterns + int processingTime = 150 + (id * 50); // Thread 0: 150ms, Thread 1: 200ms, etc. + Thread.sleep(processingTime); + + // Release buffers but keep credits reserved + for (ByteBuffer buffer : buffers) { + testPool.recycle(buffer); + } + + // Hold credits even longer to guarantee timeouts + // Staggered final sleep to create cascading release pattern + int finalHoldTime = 250 + (id * 75); // Thread 0: 250ms, Thread 1: 325ms, etc. + Thread.sleep(finalHoldTime); + + // Step 5: Finally release all credits (allowing other threads to proceed) + testPool.release(creditsPerThread); + logWithTimestamp.accept("Thread " + id + + " completed successfully and released all credits" + (threadRetryCount > 0 + ? " after " + threadRetryCount + " thread retries" : "")); + + threadSuccessful = true; // Mark thread as successful + successfulThreads.incrementAndGet(); + + } catch (IllegalStateException e) { + // Timeout occurred - cleanup partial credits and retry entire thread operation + threadRetryCount++; + if (totalReservedCredits > 0) { + testPool.release(totalReservedCredits); + logWithTimestamp + .accept("Thread " + id + " timeout, released " + totalReservedCredits + + " partial credits, will retry entire thread operation (attempt " + + threadRetryCount + ")"); + } else { + logWithTimestamp.accept( + "Thread " + id + " timeout, will retry entire thread operation (attempt " + + threadRetryCount + ")"); + } + + } catch (Exception e) { + // Other exceptions - cleanup partial credits and retry entire thread operation + threadRetryCount++; + if (totalReservedCredits > 0) { + testPool.release(totalReservedCredits); + logWithTimestamp.accept("Thread " + id + " error: " + e.getMessage() + ", released " + + totalReservedCredits + + " partial credits, will retry entire thread operation (attempt " + + threadRetryCount + ")"); + } else { + logWithTimestamp.accept("Thread " + id + " error: " + e.getMessage() + + ", will retry entire thread operation (attempt " + threadRetryCount + + ")"); + } + } + } + + if (!threadSuccessful) { + logWithTimestamp.accept( + "Thread " + id + " failed after " + threadRetryCount + " thread retry attempts"); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + completionLatch.countDown(); + } + }); + } + + // Start all threads at once to maximize contention + startLatch.countDown(); + + // Wait for completion with much longer timeout since threads will be holding credits for 1+ seconds and retrying + assertTrue("All threads should complete", completionLatch.await(120, TimeUnit.SECONDS)); + assertEquals("All threads should succeed", numThreads, successfulThreads.get()); + + // Verify that thread-level retries actually occurred (confirms timeout/retry mechanism) + int totalRetriesCount = totalRetries.get(); + logWithTimestamp.accept("Total thread retries across all threads: " + totalRetriesCount); + assertTrue("Should have some thread retries due to contention", totalRetriesCount > 0); + + executor.shutdown(); + assertTrue("Executor should terminate", executor.awaitTermination(15, TimeUnit.SECONDS)); + + } finally { + testPool.close(); + } + } + + @Test + public void testPoolCapacity() throws InterruptedException { + // Reserve credits for buffer operations + columnBufferPool.reserve(POOL_SIZE); + + List<ByteBuffer> acquiredBuffers = new ArrayList<>(); + + // Acquire buffers up to initial pool size (POOL_SIZE/2) + for (int i = 0; i < POOL_SIZE / 2; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull("Buffer " + i + " should not be null", buffer); + assertEquals("Buffer should have correct capacity", COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity()); + acquiredBuffers.add(buffer); + } + + // Acquire additional buffers (should allocate new ones) + int additionalBuffers = POOL_SIZE / 2; + for (int i = 0; i < additionalBuffers; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull("Additional buffer " + i + " should not be null", buffer); + assertEquals("Buffer should have correct capacity", COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity()); + acquiredBuffers.add(buffer); + } + + assertEquals("Total buffers should match reserved credits", POOL_SIZE, acquiredBuffers.size()); + + // Release all buffers + for (ByteBuffer buffer : acquiredBuffers) { + columnBufferPool.recycle(buffer); + } + + // Verify pool can still acquire buffers after release + ByteBuffer testBuffer = columnBufferPool.getBuffer(); + assertNotNull("Should be able to acquire buffer after release", testBuffer); + assertEquals("Buffer should have correct capacity", COLUMN_BUFFER_GRANULE_BYTES, testBuffer.capacity()); + + // Release the test buffer + columnBufferPool.recycle(testBuffer); + + // Release credits + columnBufferPool.release(POOL_SIZE); + } + + @Test + public void testBootstrapInitialization() throws InterruptedException { + // Verify the pool can acquire pre-initialized buffers + columnBufferPool.reserve(POOL_SIZE / 2); + + // Acquire all pre-initialized buffers + List<ByteBuffer> buffers = new ArrayList<>(); + for (int i = 0; i < POOL_SIZE / 2; i++) { + ByteBuffer buffer = columnBufferPool.getBuffer(); + assertNotNull("Pre-initialized buffer " + i + " should be available", buffer); + assertEquals("Buffer should have correct capacity", COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity()); + buffers.add(buffer); + } + + // Release buffers + for (ByteBuffer buffer : buffers) { + columnBufferPool.recycle(buffer); + } + + // Release credits + columnBufferPool.release(POOL_SIZE / 2); + } + + @Test + public void testCreditManagement() throws InterruptedException { + int maxCredits = columnBufferPool.getMaxReservedBuffers(); + assertTrue("Max credits should be positive", maxCredits > 0); + // Reserve credits + columnBufferPool.reserve(5); + // Try to reserve more + boolean success = columnBufferPool.tryReserve(3); + assertTrue("Should be able to reserve more credits", success); + // Release all credits + columnBufferPool.release(8); + } + + @Test + public void testBufferRecycling() throws InterruptedException { + // Reserve credits + columnBufferPool.reserve(2); + // Acquire buffer + ByteBuffer buffer1 = columnBufferPool.getBuffer(); + assertNotNull(buffer1); + // Recycle buffer + columnBufferPool.recycle(buffer1); + // Acquire another buffer (should potentially reuse) + ByteBuffer buffer2 = columnBufferPool.getBuffer(); + assertNotNull(buffer2); + // Recycle second buffer + columnBufferPool.recycle(buffer2); + // Release credits + columnBufferPool.release(2); + } + + @Test(expected = IllegalStateException.class) + public void testMemoryQuotaEnforcement() throws InterruptedException { + // Create a small pool to test quota enforcement + ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, 0.001, 1000); // Very small percentage + try { + int maxCredits = smallPool.getMaxReservedBuffers(); + smallPool.reserve(maxCredits); + // Try to acquire more buffers than the quota allows + List<ByteBuffer> buffers = new ArrayList<>(); + for (int i = 0; i < maxCredits + 10; i++) { // Try to exceed quota + ByteBuffer buffer = smallPool.getBuffer(); + buffers.add(buffer); + } + } finally { + smallPool.close(); + } + } + + @Test(expected = IllegalStateException.class) + public void testRecycleNullBuffer() { + columnBufferPool.recycle(null); + } + + @Test + public void testEdgeCases() throws InterruptedException { + // Test zero credits + columnBufferPool.reserve(0); // Should be no-op + columnBufferPool.release(0); // Should be no-op + // Test negative credits + assertTrue("tryReserve with negative should return true", columnBufferPool.tryReserve(-1)); + // Test max credits + int maxCredits = columnBufferPool.getMaxReservedBuffers(); + assertTrue("Max credits should be positive", maxCredits > 0); + } + + // Helper method to access maxCredits + private int getMaxCredits() { + return columnBufferPool.getMaxReservedBuffers(); + } +}
