This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b656c08860419c7e86c9c261cd37f7dfcc9f5db7 Author: Ritik Raj <[email protected]> AuthorDate: Fri Aug 29 19:46:45 2025 +0530 [ASTERIXDB-3636][STO] Fix buffer reservations understimation - user model changes: no - storage format changes: no - interface changes: yes Details: Previously, buffer reservation assumed each column required a single temporary buffer. However, when the column type is a non-primary string field, the system internally acquires three buffers per column, which was not accounted for. This led to under-reservation, and during leaf flush operations, more buffers were acquired than reserved, potentially exceeding the column buffer pool capacity. Additionally, unlike regular flushes where the maximum concurrency equals the number of compute partitions, in IndexBulkLoading, every storage partition on a node creates its own bulkLoader. This significantly increases buffer demand. For example: • A node with 128 storage partitions • A document with 80 columns (20% string fields) Buffer requirement = (20% of 80 × 3) + (80% of 80 × 1) = 112 buffers This change updates buffer reservation logic to correctly account for string fields. Furthermore, for JVMs with < 8GiB memory, the column buffer pool is capped at 100MiB. Ext-ref: MB-68059 Change-Id: I3bfb9e9ac4b908ee2e7d84cab4781f6e0f7444e8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20283 Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 11 ++- .../api/cluster_state_1/cluster_state_1.1.regexadm | 4 - .../cluster_state_1_full.1.regexadm | 4 - .../cluster_state_1_less.1.regexadm | 4 - ...tractParquetDeltaBinaryPackingValuesWriter.java | 2 + .../bytes/encoder/ParquetDeltaByteArrayWriter.java | 4 + .../ParquetDeltaLengthByteArrayValuesWriter.java | 3 + .../ParquetPlainFixedLengthValuesWriter.java | 2 + .../ParquetPlainVariableLengthValuesWriter.java | 2 + .../operation/lsm/flush/FlushColumnMetadata.java | 25 ++++-- .../lsm/flush/FlushColumnTupleWriter.java | 12 +++ .../lsm/flush/NoWriteFlushColumnMetadata.java | 45 ++++++++-- .../lsm/merge/MergeColumnTupleWriter.java | 9 ++ .../lsm/merge/MergeColumnWriteMetadata.java | 13 ++- .../asterix/column/values/IColumnValuesWriter.java | 5 ++ .../values/writer/AbstractColumnValuesWriter.java | 2 + .../values/writer/BooleanColumnValuesWriter.java | 9 ++ .../values/writer/DoubleColumnValuesWriter.java | 9 ++ .../values/writer/FloatColumnValuesWriter.java | 9 ++ .../values/writer/LongColumnValuesWriter.java | 15 +++- .../values/writer/NoOpColumnValuesWriter.java | 9 ++ .../writer/NullMissingColumnValuesWriter.java | 9 ++ .../values/writer/StringColumnValuesWriter.java | 12 +++ .../values/writer/UUIDColumnValuesWriter.java | 11 ++- .../values/writer/DummyColumnValuesWriter.java | 5 ++ .../asterix/common/config/StorageProperties.java | 33 +++++--- .../examples/btree/helper/RuntimeContext.java | 3 +- .../column/api/AbstractColumnTupleWriter.java | 6 ++ .../column/impls/btree/ColumnBTreeBulkloader.java | 19 ++--- .../common/buffercache/ColumnBufferPool.java | 99 ++++++++++++---------- .../common/buffercache/FreeColumnBufferPool.java | 17 ++++ .../common/buffercache/IColumnBufferPool.java | 4 +- .../support/TestStorageManagerComponentHolder.java | 3 +- .../storage/common/ColumnBufferPoolTest.java | 16 ++-- 34 files changed, 320 insertions(+), 115 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 9d80525f20..53fb3cb6bb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -148,7 +148,7 @@ import org.apache.logging.log4j.Logger; public class NCAppRuntimeContext implements INcApplicationContext { private static final Logger LOGGER = LogManager.getLogger(); - public static final double INVALID_BUFFER_POOL_MEMORY_PERCENTAGE = 0.0; + public static final long INVALID_BUFFER_POOL_MAX_MEMORY = 0; private ILSMMergePolicyFactory metadataMergePolicyFactory; private final INCServiceContext ncServiceContext; @@ -330,14 +330,13 @@ public class NCAppRuntimeContext implements INcApplicationContext { fileInfoMap, defaultContext); } - if (storageProperties.getColumnBufferPoolMemoryPercentage() <= INVALID_BUFFER_POOL_MEMORY_PERCENTAGE) { - LOGGER.info("Using FreeColumnBufferPool since column buffer pool size percentage is {}", + if (storageProperties.getColumnBufferPoolMaxMemory() <= INVALID_BUFFER_POOL_MAX_MEMORY) { + LOGGER.info("Using FreeColumnBufferPool since column buffer pool max memory is {}", storageProperties.getColumnBufferSize()); this.columnBufferPool = new FreeColumnBufferPool(); } else { this.columnBufferPool = new ColumnBufferPool(storageProperties.getColumnBufferSize(), - storageProperties.getColumnBufferPoolMaxSize(), - storageProperties.getColumnBufferPoolMemoryPercentage(), + storageProperties.getColumnBufferPoolMaxSize(), storageProperties.getColumnBufferPoolMaxMemory(), storageProperties.getColumnBufferAcquireTimeout()); } @@ -363,7 +362,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { ILifeCycleComponentManager lccm = getServiceContext().getLifeCycleComponentManager(); lccm.register((ILifeCycleComponent) virtualBufferCache); lccm.register((ILifeCycleComponent) bufferCache); - lccm.register((ILifeCycleComponent) columnBufferPool); + lccm.register(columnBufferPool); /* * LogManager must be stopped after RecoveryManager, DatasetLifeCycleManager, and ReplicationManager * to process any logs that might be generated during stopping these components diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index 24f084d814..6c78c21b42 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -90,10 +90,6 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, - "storage.column.buffer.acquire.timeout" : 120000, - "storage.column.buffer.pool.max.size" : 8000, - "storage.column.buffer.pool.memory.percentage" : 3.0, - "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 89aa04551b..c44e600a3b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -90,10 +90,6 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, - "storage.column.buffer.acquire.timeout" : 120000, - "storage.column.buffer.pool.max.size" : 8000, - "storage.column.buffer.pool.memory.percentage" : 3.0, - "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 1882daff53..10eab67e07 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -90,10 +90,6 @@ "replication\.timeout" : 120, "ssl\.enabled" : false, "storage.buffercache.pagesize" : 32768, - "storage.column.buffer.acquire.timeout" : 120000, - "storage.column.buffer.pool.max.size" : 8000, - "storage.column.buffer.pool.memory.percentage" : 3.0, - "storage.column.buffer.size" : 4096, "storage.column.free.space.tolerance" : 0.15, "storage.column.max.leaf.node.size" : 10485760, "storage.column.max.tuple.count" : 15000, diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java index 3102063137..7adb1e1aad 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java @@ -38,6 +38,8 @@ public abstract class AbstractParquetDeltaBinaryPackingValuesWriter extends Abst public static final int DEFAULT_NUM_MINIBLOCKS = 4; + public static final int REQUIRED_TEMPORARY_BUFFERS = 1; + protected final MultiTemporaryBufferBytesOutputStream outputStream; /** diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java index 4b09e4da21..b1c596c762 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java @@ -32,6 +32,10 @@ import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; * Re-implementation of {@link DeltaByteArrayWriter} */ public class ParquetDeltaByteArrayWriter extends AbstractParquetValuesWriter { + public static final int REQUIRED_TEMPORARY_BUFFERS = + ParquetDeltaBinaryPackingValuesWriterForInteger.REQUIRED_TEMPORARY_BUFFERS + + ParquetDeltaLengthByteArrayValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + private static final IValueReference EMPTY_VALUE; private final ParquetDeltaBinaryPackingValuesWriterForInteger prefixLengthWriter; private final ParquetDeltaLengthByteArrayValuesWriter suffixWriter; diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java index afab48eb82..8c8221ab35 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java @@ -34,6 +34,9 @@ import org.apache.parquet.io.ParquetEncodingException; * Re-implementation of {@link DeltaLengthByteArrayValuesWriter} */ public class ParquetDeltaLengthByteArrayValuesWriter extends AbstractParquetValuesWriter { + public static final int REQUIRED_TEMPORARY_BUFFERS = + ParquetDeltaBinaryPackingValuesWriterForInteger.REQUIRED_TEMPORARY_BUFFERS + 1; // 1 for outputStream + private final ParquetDeltaBinaryPackingValuesWriterForInteger lengthWriter; private final MultiTemporaryBufferBytesOutputStream outputStream; private final LittleEndianDataOutputStream out; diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java index 1dbaa03220..7df92b3ab9 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java @@ -35,6 +35,8 @@ import org.apache.parquet.io.ParquetEncodingException; * Re-implementation of {@link PlainValuesWriter} */ public class ParquetPlainFixedLengthValuesWriter extends AbstractParquetValuesWriter { + public static final int REQUIRED_TEMPORARY_BUFFERS = 1; // 1 for output stream + private final AbstractBytesOutputStream outputStream; private final ValueOutputStream out; diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java index 96c8bdc72e..78390a8ad5 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java @@ -32,6 +32,8 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.io.ParquetEncodingException; public class ParquetPlainVariableLengthValuesWriter extends AbstractParquetValuesWriter { + public static final int REQUIRED_TEMPORARY_BUFFERS = 1; // 1 for output stream + private final GrowableBytesOutputStream offsetStream; private final AbstractBytesOutputStream valueStream; private final ValueOutputStream offsetWriterStream; diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java index 21ad11b3c6..1c93ebb727 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java @@ -88,11 +88,13 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { private boolean changed; protected int level; protected int repeated; + protected int requiredTemporaryBuffersCount; public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, List<List<String>> primaryKeys, List<Integer> keySourceIndicator, IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef) throws HyracksDataException { super(datasetType, metaType, primaryKeys.size()); + this.requiredTemporaryBuffersCount = 0; this.multiPageOpRef = multiPageOpRef; this.columnWriterFactory = columnWriterFactory; this.orderedColumns = new IntOpenHashSet(); @@ -130,10 +132,11 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, int numPrimaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters, - IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot, - Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, + int requiredTemporaryBuffers, IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, + ObjectSchemaNode metaRoot, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, ArrayBackedValueStorage serializedMetadata) { super(datasetType, metaType, numPrimaryKeys); + this.requiredTemporaryBuffersCount = requiredTemporaryBuffers; this.multiPageOpRef = multiPageOpRef; this.columnWriterFactory = columnWriterFactory; this.orderedColumns = new IntOpenHashSet(); @@ -253,7 +256,7 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { //ColumnWriter List<IColumnValuesWriter> writers = new ArrayList<>(); - deserializeWriters(input, writers, columnWriterFactory); + int requiredTemporaryBuffers = deserializeWriters(input, writers, columnWriterFactory); //FieldNames IFieldNamesDictionary fieldNamesDictionary = AbstractFieldNamesDictionary.deserialize(input); @@ -270,7 +273,8 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { schemaStorage.append(serializedMetadata); logSchema(root, metaRoot, fieldNamesDictionary); return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys, metaContainsKeys, columnWriterFactory, - multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage); + multiPageOpRef, writers, requiredTemporaryBuffers, fieldNamesDictionary, root, metaRoot, + definitionLevels, schemaStorage); } @Override @@ -301,12 +305,17 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { } } - public static void deserializeWriters(DataInput input, List<IColumnValuesWriter> writers, + public static int deserializeWriters(DataInput input, List<IColumnValuesWriter> writers, IColumnValuesWriterFactory columnWriterFactory) throws IOException { + int numberOfRequiredTemporaryBuffers = 0; int numberOfWriters = input.readInt(); for (int i = 0; i < numberOfWriters; i++) { - writers.add(AbstractColumnValuesWriter.deserialize(input, columnWriterFactory)); + IColumnValuesWriter writer = AbstractColumnValuesWriter.deserialize(input, columnWriterFactory); + numberOfRequiredTemporaryBuffers += writer.getRequiredTemporaryBuffersCount(); + writers.add(writer); } + + return numberOfRequiredTemporaryBuffers; } /* ******************************************************** @@ -578,6 +587,7 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { if (multiPageOpRef.getValue() != null) { writer.reset(); } + requiredTemporaryBuffersCount += writer.getRequiredTemporaryBuffersCount(); addColumn(columnIndex, writer); return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey); default: @@ -618,4 +628,7 @@ public class FlushColumnMetadata extends AbstractColumnMetadata { return metaContainsKeys; } + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffersCount; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java index c0fd0493cb..56149bc205 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java @@ -221,6 +221,11 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { transformerForCurrentTuple.transform(pointable); } + @Override + public void resetTemporaryBufferForCurrentTuple() { + columnMetadataWithCurrentTuple.resetBufferCount(); + } + @Override public void writeTuple(ITupleReference tuple) throws HyracksDataException { //This from an in-memory component, hence the cast @@ -283,4 +288,11 @@ public class FlushColumnTupleWriter extends AbstractColumnTupleWriter { protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException { //NoOp } + + @Override + public int getRequiredTemporaryBuffersCountIncludingCurrentTuple() { + int requiredBuffers = columnMetadata.getRequiredTemporaryBuffersCount() + + columnMetadataWithCurrentTuple.getRequiredTemporaryBuffersCountForCurrentTuple(); + return requiredBuffers; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java index 7d77a97b39..8c546e4573 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java @@ -40,7 +40,14 @@ import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode; import org.apache.asterix.column.util.RunLengthIntArray; import org.apache.asterix.column.values.IColumnValuesWriter; import org.apache.asterix.column.values.IColumnValuesWriterFactory; +import org.apache.asterix.column.values.writer.BooleanColumnValuesWriter; +import org.apache.asterix.column.values.writer.DoubleColumnValuesWriter; +import org.apache.asterix.column.values.writer.FloatColumnValuesWriter; +import org.apache.asterix.column.values.writer.LongColumnValuesWriter; import org.apache.asterix.column.values.writer.NoOpColumnValuesWriter; +import org.apache.asterix.column.values.writer.NullMissingColumnValuesWriter; +import org.apache.asterix.column.values.writer.StringColumnValuesWriter; +import org.apache.asterix.column.values.writer.UUIDColumnValuesWriter; import org.apache.asterix.om.dictionary.AbstractFieldNamesDictionary; import org.apache.asterix.om.dictionary.IFieldNamesDictionary; import org.apache.asterix.om.types.ARecordType; @@ -59,14 +66,17 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { private int numColumns; + private int buffersRequiredByCurrentTuple; public NoWriteFlushColumnMetadata(ARecordType datasetType, ARecordType metaType, int numPrimaryKeys, boolean metaContainsKeys, IColumnValuesWriterFactory columnWriterFactory, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> writers, - IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot, - Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, ArrayBackedValueStorage schemaStorage) { + int requiredTemporaryBuffers, IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, + ObjectSchemaNode metaRoot, Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, + ArrayBackedValueStorage schemaStorage) { super(datasetType, metaType, numPrimaryKeys, metaContainsKeys, columnWriterFactory, multiPageOpRef, writers, - fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage); + requiredTemporaryBuffers, fieldNamesDictionary, root, metaRoot, definitionLevels, schemaStorage); + buffersRequiredByCurrentTuple = 0; numColumns = 0; } @@ -80,7 +90,7 @@ public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { //ColumnWriter List<IColumnValuesWriter> writers = new ArrayList<>(); - deserializeWriters(input, writers, columnWriterFactory); + int requiredTemporaryBuffers = deserializeWriters(input, writers, columnWriterFactory); //FieldNames IFieldNamesDictionary fieldNamesDictionary = AbstractFieldNamesDictionary.deserialize(input); @@ -96,8 +106,8 @@ public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { ArrayBackedValueStorage schemaStorage = new ArrayBackedValueStorage(serializedMetadata.getLength()); schemaStorage.append(serializedMetadata); return new NoWriteFlushColumnMetadata(datasetType, metaType, numPrimaryKeys, metaContainsKeys, - columnWriterFactory, multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, definitionLevels, - schemaStorage); + columnWriterFactory, multiPageOpRef, writers, requiredTemporaryBuffers, fieldNamesDictionary, root, + metaRoot, definitionLevels, schemaStorage); } public void close() { @@ -186,6 +196,8 @@ public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { if (columnIndex == numColumns) { numColumns++; } + boolean filtered = !primaryKey; + buffersRequiredByCurrentTuple += getBuffersRequiredByWriter(normalizedTypeTag, filtered); IColumnValuesWriter writer = NoOpColumnValuesWriter.INSTANCE; addColumn(columnIndex, writer); return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, primaryKey); @@ -195,6 +207,27 @@ public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata { } } + private int getBuffersRequiredByWriter(ATypeTag typeTag, boolean filtered) { + return switch (typeTag) { + case MISSING, NULL -> NullMissingColumnValuesWriter.requiredTemporaryBuffers(filtered); + case BOOLEAN -> BooleanColumnValuesWriter.requiredTemporaryBuffers(filtered); + case TINYINT, SMALLINT, INTEGER, BIGINT -> LongColumnValuesWriter.requiredTemporaryBuffers(filtered); + case FLOAT -> FloatColumnValuesWriter.requiredTemporaryBuffers(filtered); + case DOUBLE -> DoubleColumnValuesWriter.requiredTemporaryBuffers(filtered); + case STRING -> StringColumnValuesWriter.requiredTemporaryBuffers(filtered); + case UUID -> UUIDColumnValuesWriter.requiredTemporaryBuffers(filtered); + default -> throw new UnsupportedOperationException(typeTag + " is not supported"); + }; + } + + public void resetBufferCount() { + buffersRequiredByCurrentTuple = 0; + } + + public int getRequiredTemporaryBuffersCountForCurrentTuple() { + return buffersRequiredByCurrentTuple; + } + @Override protected AbstractSchemaNode addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) { return nestedNode; diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java index 2c7ae4d9bf..0ae1aea462 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java @@ -351,4 +351,13 @@ public class MergeColumnTupleWriter extends AbstractColumnTupleWriter { public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) throws HyracksDataException { } + + @Override + public void resetTemporaryBufferForCurrentTuple() { + + } + + public int getRequiredTemporaryBuffersCountIncludingCurrentTuple() { + return columnMetadata.getRequiredTemporaryBuffersCount(); + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java index 9860822ec2..c02909cb5f 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java @@ -48,6 +48,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetadata { private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef; private final List<IColumnValuesWriter> columnWriters; + private final int requiredTemporaryBuffersCount; private final List<IColumnTupleIterator> componentsTuples; /** @@ -55,10 +56,12 @@ public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetad */ private MergeColumnWriteMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters, - IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) { + int requiredTemporaryBuffersCount, IValueReference serializedMetadata, + List<IColumnTupleIterator> componentsTuples) { super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnWriters.size()); this.multiPageOpRef = multiPageOpRef; this.columnWriters = columnWriters; + this.requiredTemporaryBuffersCount = requiredTemporaryBuffersCount; this.componentsTuples = componentsTuples; } @@ -103,10 +106,14 @@ public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetad IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef); List<IColumnValuesWriter> writers = new ArrayList<>(); - FlushColumnMetadata.deserializeWriters(input, writers, writerFactory); + int requiredTemporaryBuffers = FlushColumnMetadata.deserializeWriters(input, writers, writerFactory); return new MergeColumnWriteMetadata(datasetType, metaType, numberOfPrimaryKeys, multiPageOpRef, writers, - serializedMetadata, componentsTuples); + requiredTemporaryBuffers, serializedMetadata, componentsTuples); + } + + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffersCount; } public List<IColumnTupleIterator> getComponentsTuples() { diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java index cb14d5115d..446719a57f 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java @@ -134,4 +134,9 @@ public interface IColumnValuesWriter extends IValuesWriter { * @param output destination to which the writer should be serialized to */ void serialize(DataOutput output) throws IOException; + + /** + * Get the number of temporary buffers needed by this writer + */ + int getRequiredTemporaryBuffersCount(); } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java index accf4a0c03..045b3ede20 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java @@ -46,6 +46,7 @@ public abstract class AbstractColumnValuesWriter implements IColumnValuesWriter protected final AbstractColumnFilterWriter filterWriter; protected final ParquetRunLengthBitPackingHybridEncoder definitionLevels; protected final int level; + protected final boolean filtered; private final int columnIndex; private final boolean collection; @@ -60,6 +61,7 @@ public abstract class AbstractColumnValuesWriter implements IColumnValuesWriter nullBitMask = ColumnValuesUtil.getNullMask(level); int width = ColumnValuesUtil.getBitWidth(level); definitionLevels = new ParquetRunLengthBitPackingHybridEncoder(width); + this.filtered = filtered; this.filterWriter = filtered ? createFilter() : NoOpColumnFilterWriter.INSTANCE; } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java index 7d50cb16e1..b71f802def 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java @@ -89,4 +89,13 @@ public final class BooleanColumnValuesWriter extends AbstractColumnValuesWriter protected ATypeTag getTypeTag() { return ATypeTag.BOOLEAN; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return 0; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java index 9e6f90656f..bf2daabd7d 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java @@ -117,4 +117,13 @@ public final class DoubleColumnValuesWriter extends AbstractColumnValuesWriter { protected ATypeTag getTypeTag() { return ATypeTag.DOUBLE; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java index 39abcad244..1845c18ed8 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java @@ -117,4 +117,13 @@ public final class FloatColumnValuesWriter extends AbstractColumnValuesWriter { protected ATypeTag getTypeTag() { return ATypeTag.FLOAT; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java index 516f56d634..1efa16475f 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java @@ -38,7 +38,7 @@ import org.apache.hyracks.data.std.primitive.ShortPointable; import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; import org.apache.parquet.bytes.BytesInput; -final class LongColumnValuesWriter extends AbstractColumnValuesWriter { +public final class LongColumnValuesWriter extends AbstractColumnValuesWriter { private final AbstractParquetValuesWriter longWriter; private final ATypeTag typeTag; @@ -119,4 +119,17 @@ final class LongColumnValuesWriter extends AbstractColumnValuesWriter { protected ATypeTag getTypeTag() { return typeTag; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + if (filtered) { + return ParquetDeltaBinaryPackingValuesWriterForLong.REQUIRED_TEMPORARY_BUFFERS; + } else { + return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + } + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java index e774e9c00f..cd1ce6ba96 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java @@ -121,4 +121,13 @@ public class NoOpColumnValuesWriter implements IColumnValuesWriter { public void serialize(DataOutput output) throws IOException { } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(false); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return 0; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java index 2d9f5bf152..6385756140 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java @@ -104,4 +104,13 @@ public class NullMissingColumnValuesWriter extends AbstractColumnValuesWriter { public RunLengthIntArray getDefinitionLevelsIntArray() { return defLevelsIntArray; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(false); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return 0; + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java index 5b1977f5e9..6d1ce0adcc 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java @@ -103,4 +103,16 @@ public class StringColumnValuesWriter extends AbstractColumnValuesWriter { return ATypeTag.STRING; } + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + if (filtered) { + return ParquetDeltaByteArrayWriter.REQUIRED_TEMPORARY_BUFFERS; + } else { + return ParquetPlainVariableLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + } + } } diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java index 9d4ff9a523..1eafe288a1 100644 --- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java @@ -25,7 +25,7 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp; -final class UUIDColumnValuesWriter extends StringColumnValuesWriter { +public final class UUIDColumnValuesWriter extends StringColumnValuesWriter { public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level, boolean collection, boolean filtered) { @@ -42,4 +42,13 @@ final class UUIDColumnValuesWriter extends StringColumnValuesWriter { protected ATypeTag getTypeTag() { return ATypeTag.UUID; } + + @Override + public int getRequiredTemporaryBuffersCount() { + return requiredTemporaryBuffers(filtered); + } + + public static int requiredTemporaryBuffers(boolean filtered) { + return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS; + } } diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java index d287417f48..51a515bf12 100644 --- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java +++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java @@ -148,6 +148,11 @@ public class DummyColumnValuesWriter implements IColumnValuesWriter { } + @Override + public int getRequiredTemporaryBuffersCount() { + return 0; + } + @Override public RunLengthIntArray getDefinitionLevelsIntArray() { return definitionLevels; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java index 435d8c2596..c01d6f983e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java @@ -27,8 +27,8 @@ import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNI import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.STRING; -import static org.apache.hyracks.control.common.config.OptionTypes.getRangedDoubleType; import static org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType; +import static org.apache.hyracks.control.common.config.OptionTypes.getRangedLongByteUnit; import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE; import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE; @@ -76,7 +76,14 @@ public class StorageProperties extends AbstractProperties { STORAGE_PARTITIONS_COUNT(INTEGER, 8), STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE)), STORAGE_COLUMN_BUFFER_SIZE(INTEGER_BYTE_UNIT, StorageUtil.getIntSizeInBytes(4, KILOBYTE)), - STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE(getRangedDoubleType(0.0, 100.0), 3.0d), + STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY( + getRangedLongByteUnit(0, MAX_HEAP_BYTES), + (Function<IApplicationConfig, Long>) config -> { + if (MAX_HEAP_BYTES < SMALL_JVM_HEAP_SIZE) { + return StorageUtil.getLongSizeInBytes(100, StorageUtil.StorageUnit.MEGABYTE); + } + return (long) (0.03 * MAX_HEAP_BYTES); + }), STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE(getRangedIntegerType(1, 50000), 8000), STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT(LONG, TimeUnit.SECONDS.toMillis(120)); @@ -88,6 +95,11 @@ public class StorageProperties extends AbstractProperties { this.defaultValue = defaultValue; } + <T> Option(IOptionType<T> interpreter, Function<IApplicationConfig, T> defaultOption) { + this.interpreter = interpreter; + this.defaultValue = defaultOption; + } + @Override public Section section() { switch (this) { @@ -102,10 +114,6 @@ public class StorageProperties extends AbstractProperties { case STORAGE_COLUMN_MAX_TUPLE_COUNT: case STORAGE_COLUMN_FREE_SPACE_TOLERANCE: case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE: - case STORAGE_COLUMN_BUFFER_SIZE: - case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE: - case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE: - case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT: return Section.COMMON; default: return Section.NC; @@ -176,7 +184,7 @@ public class StorageProperties extends AbstractProperties { return "The resultant disk component after a merge must not exceed the specified maximum size."; case STORAGE_COLUMN_BUFFER_SIZE: return "The size in bytes of each buffer in the column buffer pool."; - case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE: + case STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY: return "The percentage of the JVM memory allocated to the column buffer pool. This controls the number of the column buffer relative to the total JVM memory."; case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE: return "The maximum number of buffers in the column buffer pool."; @@ -209,6 +217,7 @@ public class StorageProperties extends AbstractProperties { } public static final long MAX_HEAP_BYTES = Runtime.getRuntime().maxMemory(); + public static final long SMALL_JVM_HEAP_SIZE = StorageUtil.getByteValue("8 GB"); private static final int SYSTEM_RESERVED_DATASETS = 0; public StorageProperties(PropertiesAccessor accessor) { @@ -256,8 +265,8 @@ public class StorageProperties extends AbstractProperties { return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_SIZE); } - public double getColumnBufferPoolMemoryPercentage() { - return accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE); + public long getColumnBufferPoolMaxMemory() { + return accessor.getLong(Option.STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY); } public int getColumnBufferPoolMaxSize() { @@ -284,10 +293,8 @@ public class StorageProperties extends AbstractProperties { int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions); int writeBufferSize = runtimeContext.getCloudProperties().getWriteBufferSize(); - double columnBufferPoolSizePercentage = - accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE); - long maxMemoryForColumnBuffers = (long) (MAX_HEAP_BYTES * (columnBufferPoolSizePercentage / 100)); - jobExecutionMemory -= maxMemoryForColumnBuffers; + long columnBufferPoolMaxMemory = getColumnBufferPoolMaxMemory(); + jobExecutionMemory -= columnBufferPoolMaxMemory; jobExecutionMemory -= (long) (maxConcurrentFlushes + maxConcurrentMerges) * writeBufferSize; } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java index 08a1dec947..a367324395 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java @@ -69,7 +69,8 @@ public class RuntimeContext { this.ioManager = appCtx.getIoManager(); bufferCache = new BufferCache(ioManager, prs, new DelayPageCleanerPolicy(1000), fileMapManager, 100, 10, threadFactory, new HashMap<>(), DefaultBufferCacheReadContextProvider.DEFAULT); - columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, TimeUnit.MINUTES.toMillis(2)); + columnBufferPool = new ColumnBufferPool(4 * 1024, 500, (long) (0.03 * Runtime.getRuntime().maxMemory()), + TimeUnit.MINUTES.toMillis(2)); ILocalResourceRepositoryFactory localResourceRepositoryFactory = new TransientLocalResourceRepositoryFactory(); localResourceRepository = localResourceRepositoryFactory.createRepository(); resourceIdFactory = (new ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java index fa1d9c4167..43d6ad696b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java @@ -77,6 +77,11 @@ public abstract class AbstractColumnTupleWriter extends AbstractTupleWriterDisab */ public abstract int getPrimaryKeysEstimatedSize(); + /** + * Reset the temporary buffer for the current tuple + */ + public abstract void resetTemporaryBufferForCurrentTuple(); + /** * Writes the tuple into a temporary internal buffers * @@ -125,4 +130,5 @@ public abstract class AbstractColumnTupleWriter extends AbstractTupleWriterDisab } } + public abstract int getRequiredTemporaryBuffersCountIncludingCurrentTuple(); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java index 0769824425..916b48c420 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java @@ -119,14 +119,11 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I writeFullLeafPage(); confiscateNewLeafPage(); } - if (tupleCount == 0) { - //Since we are writing the first tuple, we need to estimate the number of columns. - columnWriter.updateColumnMetadataForCurrentTuple(tuple); - } //Save the key of the last inserted tuple setMinMaxKeys(tuple); columnWriter.writeTuple(tuple); tupleCount++; + columnWriter.resetTemporaryBufferForCurrentTuple(); } /** @@ -136,9 +133,9 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I private void ensureWritersInitialized() throws HyracksDataException { if (!writerInitialized) { writerInitialized = true; - int numberOfColumns = columnWriter.getAbsoluteNumberOfColumns(true); - reserveBuffers(numberOfColumns); - reservedBufferCount = numberOfColumns; + int requiredBuffers = columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple(); + reserveBuffers(requiredBuffers); + reservedBufferCount = requiredBuffers; columnWriter.init(this); } } @@ -183,8 +180,8 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I return true; } if (canCurrentTupleFit(tuple)) { - int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true); - return !ensureSufficientBuffers(requiredColumns); + int requiredBuffers = columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple(); + return !ensureSufficientBuffers(requiredBuffers); } // Not enough space for the tuple. return true; @@ -239,8 +236,8 @@ public final class ColumnBTreeBulkloader extends BTreeNSMBulkLoader implements I * @return false, as the first tuple always fits after setup. */ private boolean handleFirstTuple() throws HyracksDataException { - int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true); - int additionalBuffersNeeded = requiredColumns - reservedBufferCount; + int requiredBuffers = columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple(); + int additionalBuffersNeeded = requiredBuffers - reservedBufferCount; if (additionalBuffersNeeded > 0) { reserveBuffers(additionalBuffersNeeded); reservedBufferCount += additionalBuffersNeeded; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java index 5a15f12978..35c484d8cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java @@ -25,9 +25,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,20 +36,21 @@ import org.apache.logging.log4j.Logger; * * NOTE: getBuffer() and recycle() are called very frequently and are designed to be as lightweight as possible. */ -public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent { +public class ColumnBufferPool implements IColumnBufferPool { protected static final Logger LOGGER = LogManager.getLogger(); private final BlockingQueue<ByteBuffer> bufferPool; private final AtomicLong totalAllocatedMemoryInBytes; private final AtomicLong totalPooledMemoryInBytes; - private final AtomicLong numAllocNew; + private final AtomicInteger numAllocNew; + private final AtomicInteger buffersAllocated; // Semaphore for buffer-based allocation private final Semaphore bufferSemaphore; - private final long maxBufferPoolMemoryLimit; - private final double columnBufferPoolMemoryPercentage; + private final long bufferPoolMemoryLimit; + private final long columnBufferPoolMaxMemory; private final int columnBufferInBytes; - private final int columnBufferPoolMaxSize; + private final int columnBufferPoolCapacity; private final int maxBuffers; // Timeout for buffer reservation in milliseconds @@ -58,45 +59,43 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent /** * @param columnBufferInBytes buffer size in bytes * @param columnBufferPoolMaxSize max number of buffers in pool - * @param columnBufferPoolMemoryPercentage max percentage of total memory for pool + * @param columnBufferPoolMaxMemory max memory for pool * @param reserveTimeoutMillis timeout in milliseconds for buffer reservation */ - public ColumnBufferPool(int columnBufferInBytes, int columnBufferPoolMaxSize, - double columnBufferPoolMemoryPercentage, long reserveTimeoutMillis) { + public ColumnBufferPool(int columnBufferInBytes, int columnBufferPoolMaxSize, long columnBufferPoolMaxMemory, + long reserveTimeoutMillis) { this.totalAllocatedMemoryInBytes = new AtomicLong(0); - this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolMaxSize); - this.columnBufferPoolMemoryPercentage = columnBufferPoolMemoryPercentage; - this.columnBufferPoolMaxSize = columnBufferPoolMaxSize; + this.columnBufferPoolMaxMemory = columnBufferPoolMaxMemory; this.reserveTimeoutMillis = reserveTimeoutMillis; - this.maxBufferPoolMemoryLimit = getMaxBufferPoolMemoryLimit(columnBufferInBytes, - columnBufferPoolMemoryPercentage, columnBufferPoolMaxSize); - this.maxBuffers = (int) (maxBufferPoolMemoryLimit / columnBufferInBytes); + this.bufferPoolMemoryLimit = + getMaxBufferPoolMemoryLimit(columnBufferInBytes, columnBufferPoolMaxMemory, columnBufferPoolMaxSize); + this.maxBuffers = (int) (bufferPoolMemoryLimit / columnBufferInBytes); this.bufferSemaphore = new Semaphore(maxBuffers, true); this.columnBufferInBytes = columnBufferInBytes; this.totalPooledMemoryInBytes = new AtomicLong(0); - this.numAllocNew = new AtomicLong(0); - initializePool(); + this.buffersAllocated = new AtomicInteger(0); + this.numAllocNew = new AtomicInteger(0); + this.columnBufferPoolCapacity = Math.min(maxBuffers, columnBufferPoolMaxSize); + this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolCapacity); + initializePool(columnBufferPoolCapacity); LOGGER.info( - "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferInBytes={}, reserveTimeoutMillis={}", - columnBufferPoolMaxSize, maxBufferPoolMemoryLimit, maxBuffers, columnBufferInBytes, - reserveTimeoutMillis); + "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferPoolCapacity={}, columnBufferInBytes={}, reserveTimeoutMillis={}", + columnBufferPoolMaxSize, columnBufferPoolMaxMemory, maxBuffers, columnBufferPoolCapacity, + columnBufferInBytes, reserveTimeoutMillis); } /** * Pre-allocate buffers for fast buffer access. * The number of buffers pre-allocated is the minimum of half the pool size and the memory limit. */ - private void initializePool() { - int halfPoolSize = columnBufferPoolMaxSize / 2; - int numBuffersToAllocate = Math.min(halfPoolSize, maxBuffers); - - for (int i = 0; i < numBuffersToAllocate; i++) { + private void initializePool(int bufferPoolCapacity) { + for (int i = 0; i < bufferPoolCapacity / 2; i++) { ByteBuffer buffer = ByteBuffer.allocate(columnBufferInBytes); bufferPool.add(buffer); totalPooledMemoryInBytes.addAndGet(columnBufferInBytes); } - LOGGER.info("ColumnBufferPool pre-allocated {} buffers ({} bytes each)", numBuffersToAllocate, - columnBufferInBytes); + LOGGER.info("ColumnBufferPool pre-allocated {} buffers having bufferPoolSize {} ({} bytes each)", + bufferPoolCapacity / 2, bufferPoolCapacity, columnBufferInBytes); } /** @@ -112,8 +111,15 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent } boolean acquired = bufferSemaphore.tryAcquire(requestedBuffers, reserveTimeoutMillis, TimeUnit.MILLISECONDS); if (!acquired) { - LOGGER.error("Failed to reserve {} buffers within {} ms ({} sec)", requestedBuffers, reserveTimeoutMillis, - TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis)); + LOGGER.error( + "Failed to reserve {} buffers within {} ms ({} sec); " + + "totalAllocatedMemoryInBytes={}, maxBufferPoolMemoryLimit={}, bufferPoolCapacity={}, columnBufferInBytes={}, " + + "columnBufferPoolMaxMemory={}, numAllocNew={}, availableBuffers={}. " + + "Consider increasing storageColumnBufferPoolMaxMemory (current: {}).", + requestedBuffers, reserveTimeoutMillis, TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis), + totalAllocatedMemoryInBytes.get(), bufferPoolMemoryLimit, columnBufferPoolCapacity, + columnBufferInBytes, columnBufferPoolMaxMemory, numAllocNew.get(), + bufferSemaphore.availablePermits(), columnBufferPoolMaxMemory); throw new IllegalStateException("Timeout while reserving column buffers (" + requestedBuffers + ") after " + reserveTimeoutMillis + " ms"); } @@ -140,6 +146,7 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent */ @Override public ByteBuffer getBuffer() { + buffersAllocated.incrementAndGet(); // Fast path: try to poll from pool ByteBuffer buffer = bufferPool.poll(); if (buffer != null) { @@ -165,7 +172,7 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent if (buffer == null) { throw new IllegalStateException("buffer is null"); } - + buffersAllocated.decrementAndGet(); // Try to return to pool; if full, discard if (bufferPool.offer(buffer)) { totalPooledMemoryInBytes.addAndGet(columnBufferInBytes); @@ -184,28 +191,26 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent * Throws if the limit would be exceeded. */ private void ensureAvailableQuota() { - long spaceAcquiredByPool = (long) columnBufferPoolMaxSize * columnBufferInBytes; - long totalAllocated = totalAllocatedMemoryInBytes.get(); - long totalIfAllocated = totalAllocated + spaceAcquiredByPool; - if (totalIfAllocated > maxBufferPoolMemoryLimit) { + if (buffersAllocated.get() > maxBuffers) { + long spaceAcquiredByPool = (long) columnBufferPoolCapacity * columnBufferInBytes; + long totalAllocated = totalAllocatedMemoryInBytes.get(); + long totalIfAllocated = totalAllocated + spaceAcquiredByPool; LOGGER.error( "Cannot allocate more buffers, memory limit reached. " - + "totalAllocatedMemoryInBytes={}, maxBufferPoolMemoryLimit={}, columnBufferPoolMaxSize={}, columnBufferInBytes={}, " - + "columnBufferPoolMemoryPercentage={}, totalIfAllocated={}, numAllocNew={}, availableBuffers={}. " - + "Consider increasing storageColumnBufferPoolMemoryPercentage (current: {}).", - totalAllocated, maxBufferPoolMemoryLimit, columnBufferPoolMaxSize, columnBufferInBytes, - columnBufferPoolMemoryPercentage, totalIfAllocated, numAllocNew.get(), - bufferSemaphore.availablePermits(), columnBufferPoolMemoryPercentage); - throw new IllegalStateException("Cannot allocate more buffers, maxBufferPoolMemoryLimit (" - + maxBufferPoolMemoryLimit + ") reached."); + + "totalAllocatedMemoryInBytes={}, maxBufferPoolMemoryLimit={}, columnBufferPoolCapacity={}, columnBufferInBytes={}, " + + "columnBufferPoolMaxMemory={}, totalIfAllocated={}, numAllocNew={}, availableBuffers={}. " + + "Consider increasing storageColumnBufferPoolMaxMemory (current: {}).", + totalAllocated, bufferPoolMemoryLimit, columnBufferPoolCapacity, columnBufferInBytes, + columnBufferPoolMaxMemory, totalIfAllocated, numAllocNew.get(), bufferSemaphore.availablePermits(), + columnBufferPoolMaxMemory); + throw new IllegalStateException( + "Cannot allocate more buffers, maxBufferPoolMemoryLimit (" + bufferPoolMemoryLimit + ") reached."); } } - private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, double columnBufferPoolMemoryPercentage, + private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, long columnBufferPoolMaxMemoryLimit, int columnBufferPoolMaxSize) { - long totalMemory = Runtime.getRuntime().totalMemory(); - return (long) Math.max(totalMemory * (columnBufferPoolMemoryPercentage / 100), - columnBufferInBytes * columnBufferPoolMaxSize); + return Math.max(columnBufferPoolMaxMemoryLimit, (long) columnBufferInBytes * columnBufferPoolMaxSize); } @Override @@ -225,7 +230,7 @@ public class ColumnBufferPool implements IColumnBufferPool, ILifeCycleComponent public void dumpState(OutputStream os) throws IOException { long pooledBytes = totalPooledMemoryInBytes.get(); long totalAllocatedBytes = totalAllocatedMemoryInBytes.get(); - String buffer = "ColumnBufferPool State:\n" + "columnBufferPoolMaxSize: " + columnBufferPoolMaxSize + "\n" + String buffer = "ColumnBufferPool State:\n" + "columnBufferPoolCapacity: " + columnBufferPoolCapacity + "\n" + "Max Buffers: " + maxBuffers + "\n" + "Total Pooled Memory (bytes): " + pooledBytes + "\n" + "Total Allocated Memory (bytes): " + totalAllocatedBytes + "\n" + "Number of Buffers Allocated New: " + numAllocNew.get() + "\n" + "Available Buffers: " + bufferSemaphore.availablePermits(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java index e47478f4b0..43593f1e31 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.storage.common.buffercache; +import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hyracks.util.StorageUtil; @@ -59,4 +61,19 @@ public class FreeColumnBufferPool implements IColumnBufferPool { public void close() { } + + @Override + public void start() { + + } + + @Override + public void dumpState(OutputStream os) throws IOException { + + } + + @Override + public void stop(boolean dumpState, OutputStream ouputStream) throws IOException { + + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java index 9fc3027799..4040e5e1f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java @@ -20,6 +20,8 @@ package org.apache.hyracks.storage.common.buffercache; import java.nio.ByteBuffer; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; + /** * Interface for a buffer-based pool managing column buffer allocation. * @@ -29,7 +31,7 @@ import java.nio.ByteBuffer; * - Buffers are acquired via getBuffer() and returned via recycle(). * - The pool may block or fail to allocate if semaphore permits are exhausted. */ -public interface IColumnBufferPool { +public interface IColumnBufferPool extends ILifeCycleComponent { /** * Reserve the specified number of buffers, blocking if necessary until available. diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java index 5788823a94..cdc36a3dd8 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java @@ -159,7 +159,8 @@ public class TestStorageManagerComponentHolder { return columnBufferPool; } - columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, TimeUnit.MINUTES.toMillis(2)); + columnBufferPool = new ColumnBufferPool(4 * 1024, 500, (long) (0.03 * Runtime.getRuntime().maxMemory()), + TimeUnit.MINUTES.toMillis(2)); return columnBufferPool; } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java index 0428b48e06..de95944197 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java @@ -42,13 +42,13 @@ public class ColumnBufferPoolTest { private ColumnBufferPool columnBufferPool; private static final int COLUMN_BUFFER_GRANULE_BYTES = 4096; // 4KB private static final int POOL_SIZE = 10; - private static final double CAPPED_PERCENTAGE = 10.0; // 10% of runtime memory + private static final long COLUMN_BUFFER_MAX_MEMORY_LIMIT = getColumnBufferPoolMaxMemoryLimit(10); // 10% of runtime memory private static final long RESERVE_TIMEOUT_MILLIS = 5000; // 5 seconds timeout @Before public void setUp() { - columnBufferPool = - new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, RESERVE_TIMEOUT_MILLIS); + columnBufferPool = new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, COLUMN_BUFFER_MAX_MEMORY_LIMIT, + RESERVE_TIMEOUT_MILLIS); } @After @@ -103,7 +103,7 @@ public class ColumnBufferPoolTest { public void testReserveTimeout() throws InterruptedException { // Use a shorter timeout for this test ColumnBufferPool shortTimeoutPool = - new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, 100); // 100ms timeout + new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, COLUMN_BUFFER_MAX_MEMORY_LIMIT, 100); // 100ms timeout try { // Reserve all credits int maxCredits = shortTimeoutPool.getMaxReservedBuffers(); @@ -208,7 +208,7 @@ public class ColumnBufferPoolTest { public void testMultipleThreadsConcurrentExecution() throws InterruptedException { // Use shorter timeout to make timeouts happen faster in test ColumnBufferPool testPool = - new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, CAPPED_PERCENTAGE, 500); // 500ms timeout + new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, COLUMN_BUFFER_MAX_MEMORY_LIMIT, 500); // 500ms timeout // Helper for timestamped log java.util.function.Consumer<String> logWithTimestamp = msg -> { @@ -475,7 +475,7 @@ public class ColumnBufferPoolTest { @Test(expected = IllegalStateException.class) public void testMemoryQuotaEnforcement() throws InterruptedException { // Create a small pool to test quota enforcement - ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, 0.001, 1000); // Very small percentage + ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, getColumnBufferPoolMaxMemoryLimit(0.001), 1000); // Very small percentage try { int maxCredits = smallPool.getMaxReservedBuffers(); smallPool.reserve(maxCredits); @@ -511,4 +511,8 @@ public class ColumnBufferPoolTest { private int getMaxCredits() { return columnBufferPool.getMaxReservedBuffers(); } + + private static long getColumnBufferPoolMaxMemoryLimit(double percentage) { + return (long) ((percentage / 100) * Runtime.getRuntime().maxMemory()); + } }
