This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit b656c08860419c7e86c9c261cd37f7dfcc9f5db7
Author: Ritik Raj <[email protected]>
AuthorDate: Fri Aug 29 19:46:45 2025 +0530

    [ASTERIXDB-3636][STO] Fix buffer reservations understimation
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    Previously, buffer reservation assumed each column required a
    single temporary buffer. However, when the column type is a
    non-primary string field, the system internally acquires three
    buffers per column, which was not accounted for. This led to
    under-reservation, and during leaf flush operations, more
    buffers were acquired than reserved, potentially exceeding the
    column buffer pool capacity.
    
    Additionally, unlike regular flushes where the maximum
    concurrency equals the number of compute partitions, in
    IndexBulkLoading, every storage partition on a node creates its
    own bulkLoader. This significantly increases buffer demand.
    
    For example:
        • A node with 128 storage partitions
        • A document with 80 columns (20% string fields)
    
    Buffer requirement = (20% of 80 × 3) + (80% of 80 × 1) = 112
    buffers
    
    This change updates buffer reservation logic to correctly
    account for string fields. Furthermore, for JVMs with < 8GiB
    memory, the column buffer pool is capped at 100MiB.
    
    Ext-ref: MB-68059
    Change-Id: I3bfb9e9ac4b908ee2e7d84cab4781f6e0f7444e8
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20283
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Tested-by: Michael Blow <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java | 11 ++-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |  4 -
 .../cluster_state_1_full.1.regexadm                |  4 -
 .../cluster_state_1_less.1.regexadm                |  4 -
 ...tractParquetDeltaBinaryPackingValuesWriter.java |  2 +
 .../bytes/encoder/ParquetDeltaByteArrayWriter.java |  4 +
 .../ParquetDeltaLengthByteArrayValuesWriter.java   |  3 +
 .../ParquetPlainFixedLengthValuesWriter.java       |  2 +
 .../ParquetPlainVariableLengthValuesWriter.java    |  2 +
 .../operation/lsm/flush/FlushColumnMetadata.java   | 25 ++++--
 .../lsm/flush/FlushColumnTupleWriter.java          | 12 +++
 .../lsm/flush/NoWriteFlushColumnMetadata.java      | 45 ++++++++--
 .../lsm/merge/MergeColumnTupleWriter.java          |  9 ++
 .../lsm/merge/MergeColumnWriteMetadata.java        | 13 ++-
 .../asterix/column/values/IColumnValuesWriter.java |  5 ++
 .../values/writer/AbstractColumnValuesWriter.java  |  2 +
 .../values/writer/BooleanColumnValuesWriter.java   |  9 ++
 .../values/writer/DoubleColumnValuesWriter.java    |  9 ++
 .../values/writer/FloatColumnValuesWriter.java     |  9 ++
 .../values/writer/LongColumnValuesWriter.java      | 15 +++-
 .../values/writer/NoOpColumnValuesWriter.java      |  9 ++
 .../writer/NullMissingColumnValuesWriter.java      |  9 ++
 .../values/writer/StringColumnValuesWriter.java    | 12 +++
 .../values/writer/UUIDColumnValuesWriter.java      | 11 ++-
 .../values/writer/DummyColumnValuesWriter.java     |  5 ++
 .../asterix/common/config/StorageProperties.java   | 33 +++++---
 .../examples/btree/helper/RuntimeContext.java      |  3 +-
 .../column/api/AbstractColumnTupleWriter.java      |  6 ++
 .../column/impls/btree/ColumnBTreeBulkloader.java  | 19 ++---
 .../common/buffercache/ColumnBufferPool.java       | 99 ++++++++++++----------
 .../common/buffercache/FreeColumnBufferPool.java   | 17 ++++
 .../common/buffercache/IColumnBufferPool.java      |  4 +-
 .../support/TestStorageManagerComponentHolder.java |  3 +-
 .../storage/common/ColumnBufferPoolTest.java       | 16 ++--
 34 files changed, 320 insertions(+), 115 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9d80525f20..53fb3cb6bb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -148,7 +148,7 @@ import org.apache.logging.log4j.Logger;
 
 public class NCAppRuntimeContext implements INcApplicationContext {
     private static final Logger LOGGER = LogManager.getLogger();
-    public static final double INVALID_BUFFER_POOL_MEMORY_PERCENTAGE = 0.0;
+    public static final long INVALID_BUFFER_POOL_MAX_MEMORY = 0;
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCServiceContext ncServiceContext;
@@ -330,14 +330,13 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                     fileInfoMap, defaultContext);
         }
 
-        if (storageProperties.getColumnBufferPoolMemoryPercentage() <= 
INVALID_BUFFER_POOL_MEMORY_PERCENTAGE) {
-            LOGGER.info("Using FreeColumnBufferPool since column buffer pool 
size percentage is {}",
+        if (storageProperties.getColumnBufferPoolMaxMemory() <= 
INVALID_BUFFER_POOL_MAX_MEMORY) {
+            LOGGER.info("Using FreeColumnBufferPool since column buffer pool 
max memory is {}",
                     storageProperties.getColumnBufferSize());
             this.columnBufferPool = new FreeColumnBufferPool();
         } else {
             this.columnBufferPool = new 
ColumnBufferPool(storageProperties.getColumnBufferSize(),
-                    storageProperties.getColumnBufferPoolMaxSize(),
-                    storageProperties.getColumnBufferPoolMemoryPercentage(),
+                    storageProperties.getColumnBufferPoolMaxSize(), 
storageProperties.getColumnBufferPoolMaxMemory(),
                     storageProperties.getColumnBufferAcquireTimeout());
         }
 
@@ -363,7 +362,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         ILifeCycleComponentManager lccm = 
getServiceContext().getLifeCycleComponentManager();
         lccm.register((ILifeCycleComponent) virtualBufferCache);
         lccm.register((ILifeCycleComponent) bufferCache);
-        lccm.register((ILifeCycleComponent) columnBufferPool);
+        lccm.register(columnBufferPool);
         /*
          * LogManager must be stopped after RecoveryManager, 
DatasetLifeCycleManager, and ReplicationManager
          * to process any logs that might be generated during stopping these 
components
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 24f084d814..6c78c21b42 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -90,10 +90,6 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
-    "storage.column.buffer.acquire.timeout" : 120000,
-    "storage.column.buffer.pool.max.size" : 8000,
-    "storage.column.buffer.pool.memory.percentage" : 3.0,
-    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 89aa04551b..c44e600a3b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -90,10 +90,6 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
-    "storage.column.buffer.acquire.timeout" : 120000,
-    "storage.column.buffer.pool.max.size" : 8000,
-    "storage.column.buffer.pool.memory.percentage" : 3.0,
-    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 1882daff53..10eab67e07 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -90,10 +90,6 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
-    "storage.column.buffer.acquire.timeout" : 120000,
-    "storage.column.buffer.pool.max.size" : 8000,
-    "storage.column.buffer.pool.memory.percentage" : 3.0,
-    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
index 3102063137..7adb1e1aad 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
@@ -38,6 +38,8 @@ public abstract class 
AbstractParquetDeltaBinaryPackingValuesWriter extends Abst
 
     public static final int DEFAULT_NUM_MINIBLOCKS = 4;
 
+    public static final int REQUIRED_TEMPORARY_BUFFERS = 1;
+
     protected final MultiTemporaryBufferBytesOutputStream outputStream;
 
     /**
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
index 4b09e4da21..b1c596c762 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -32,6 +32,10 @@ import 
org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
  * Re-implementation of {@link DeltaByteArrayWriter}
  */
 public class ParquetDeltaByteArrayWriter extends AbstractParquetValuesWriter {
+    public static final int REQUIRED_TEMPORARY_BUFFERS =
+            
ParquetDeltaBinaryPackingValuesWriterForInteger.REQUIRED_TEMPORARY_BUFFERS
+                    + 
ParquetDeltaLengthByteArrayValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+
     private static final IValueReference EMPTY_VALUE;
     private final ParquetDeltaBinaryPackingValuesWriterForInteger 
prefixLengthWriter;
     private final ParquetDeltaLengthByteArrayValuesWriter suffixWriter;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
index afab48eb82..8c8221ab35 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
@@ -34,6 +34,9 @@ import org.apache.parquet.io.ParquetEncodingException;
  * Re-implementation of {@link DeltaLengthByteArrayValuesWriter}
  */
 public class ParquetDeltaLengthByteArrayValuesWriter extends 
AbstractParquetValuesWriter {
+    public static final int REQUIRED_TEMPORARY_BUFFERS =
+            
ParquetDeltaBinaryPackingValuesWriterForInteger.REQUIRED_TEMPORARY_BUFFERS + 1; 
// 1 for outputStream
+
     private final ParquetDeltaBinaryPackingValuesWriterForInteger lengthWriter;
     private final MultiTemporaryBufferBytesOutputStream outputStream;
     private final LittleEndianDataOutputStream out;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
index 1dbaa03220..7df92b3ab9 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
@@ -35,6 +35,8 @@ import org.apache.parquet.io.ParquetEncodingException;
  * Re-implementation of {@link PlainValuesWriter}
  */
 public class ParquetPlainFixedLengthValuesWriter extends 
AbstractParquetValuesWriter {
+    public static final int REQUIRED_TEMPORARY_BUFFERS = 1; // 1 for output 
stream
+
     private final AbstractBytesOutputStream outputStream;
     private final ValueOutputStream out;
 
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
index 96c8bdc72e..78390a8ad5 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
@@ -32,6 +32,8 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.io.ParquetEncodingException;
 
 public class ParquetPlainVariableLengthValuesWriter extends 
AbstractParquetValuesWriter {
+    public static final int REQUIRED_TEMPORARY_BUFFERS = 1; // 1 for output 
stream
+
     private final GrowableBytesOutputStream offsetStream;
     private final AbstractBytesOutputStream valueStream;
     private final ValueOutputStream offsetWriterStream;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index 21ad11b3c6..1c93ebb727 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -88,11 +88,13 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
     private boolean changed;
     protected int level;
     protected int repeated;
+    protected int requiredTemporaryBuffersCount;
 
     public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
List<List<String>> primaryKeys,
             List<Integer> keySourceIndicator, IColumnValuesWriterFactory 
columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef) throws 
HyracksDataException {
         super(datasetType, metaType, primaryKeys.size());
+        this.requiredTemporaryBuffersCount = 0;
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
         this.orderedColumns = new IntOpenHashSet();
@@ -130,10 +132,11 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
     public FlushColumnMetadata(ARecordType datasetType, ARecordType metaType, 
int numPrimaryKeys,
             boolean metaContainsKeys, IColumnValuesWriterFactory 
columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> columnWriters,
-            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
-            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels,
+            int requiredTemporaryBuffers, IFieldNamesDictionary 
fieldNamesDictionary, ObjectSchemaNode root,
+            ObjectSchemaNode metaRoot, Map<AbstractSchemaNestedNode, 
RunLengthIntArray> definitionLevels,
             ArrayBackedValueStorage serializedMetadata) {
         super(datasetType, metaType, numPrimaryKeys);
+        this.requiredTemporaryBuffersCount = requiredTemporaryBuffers;
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriterFactory = columnWriterFactory;
         this.orderedColumns = new IntOpenHashSet();
@@ -253,7 +256,7 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
 
         //ColumnWriter
         List<IColumnValuesWriter> writers = new ArrayList<>();
-        deserializeWriters(input, writers, columnWriterFactory);
+        int requiredTemporaryBuffers = deserializeWriters(input, writers, 
columnWriterFactory);
 
         //FieldNames
         IFieldNamesDictionary fieldNamesDictionary = 
AbstractFieldNamesDictionary.deserialize(input);
@@ -270,7 +273,8 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
         schemaStorage.append(serializedMetadata);
         logSchema(root, metaRoot, fieldNamesDictionary);
         return new FlushColumnMetadata(datasetType, metaType, numPrimaryKeys, 
metaContainsKeys, columnWriterFactory,
-                multiPageOpRef, writers, fieldNamesDictionary, root, metaRoot, 
definitionLevels, schemaStorage);
+                multiPageOpRef, writers, requiredTemporaryBuffers, 
fieldNamesDictionary, root, metaRoot,
+                definitionLevels, schemaStorage);
     }
 
     @Override
@@ -301,12 +305,17 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
         }
     }
 
-    public static void deserializeWriters(DataInput input, 
List<IColumnValuesWriter> writers,
+    public static int deserializeWriters(DataInput input, 
List<IColumnValuesWriter> writers,
             IColumnValuesWriterFactory columnWriterFactory) throws IOException 
{
+        int numberOfRequiredTemporaryBuffers = 0;
         int numberOfWriters = input.readInt();
         for (int i = 0; i < numberOfWriters; i++) {
-            writers.add(AbstractColumnValuesWriter.deserialize(input, 
columnWriterFactory));
+            IColumnValuesWriter writer = 
AbstractColumnValuesWriter.deserialize(input, columnWriterFactory);
+            numberOfRequiredTemporaryBuffers += 
writer.getRequiredTemporaryBuffersCount();
+            writers.add(writer);
         }
+
+        return numberOfRequiredTemporaryBuffers;
     }
 
     /* ********************************************************
@@ -578,6 +587,7 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
                 if (multiPageOpRef.getValue() != null) {
                     writer.reset();
                 }
+                requiredTemporaryBuffersCount += 
writer.getRequiredTemporaryBuffersCount();
                 addColumn(columnIndex, writer);
                 return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, 
primaryKey);
             default:
@@ -618,4 +628,7 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
         return metaContainsKeys;
     }
 
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffersCount;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index c0fd0493cb..56149bc205 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -221,6 +221,11 @@ public class FlushColumnTupleWriter extends 
AbstractColumnTupleWriter {
         transformerForCurrentTuple.transform(pointable);
     }
 
+    @Override
+    public void resetTemporaryBufferForCurrentTuple() {
+        columnMetadataWithCurrentTuple.resetBufferCount();
+    }
+
     @Override
     public void writeTuple(ITupleReference tuple) throws HyracksDataException {
         //This from an in-memory component, hence the cast
@@ -283,4 +288,11 @@ public class FlushColumnTupleWriter extends 
AbstractColumnTupleWriter {
     protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws 
HyracksDataException {
         //NoOp
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCountIncludingCurrentTuple() {
+        int requiredBuffers = columnMetadata.getRequiredTemporaryBuffersCount()
+                + 
columnMetadataWithCurrentTuple.getRequiredTemporaryBuffersCountForCurrentTuple();
+        return requiredBuffers;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
index 7d77a97b39..8c546e4573 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/NoWriteFlushColumnMetadata.java
@@ -40,7 +40,14 @@ import 
org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
 import org.apache.asterix.column.util.RunLengthIntArray;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.BooleanColumnValuesWriter;
+import org.apache.asterix.column.values.writer.DoubleColumnValuesWriter;
+import org.apache.asterix.column.values.writer.FloatColumnValuesWriter;
+import org.apache.asterix.column.values.writer.LongColumnValuesWriter;
 import org.apache.asterix.column.values.writer.NoOpColumnValuesWriter;
+import org.apache.asterix.column.values.writer.NullMissingColumnValuesWriter;
+import org.apache.asterix.column.values.writer.StringColumnValuesWriter;
+import org.apache.asterix.column.values.writer.UUIDColumnValuesWriter;
 import org.apache.asterix.om.dictionary.AbstractFieldNamesDictionary;
 import org.apache.asterix.om.dictionary.IFieldNamesDictionary;
 import org.apache.asterix.om.types.ARecordType;
@@ -59,14 +66,17 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 public final class NoWriteFlushColumnMetadata extends FlushColumnMetadata {
 
     private int numColumns;
+    private int buffersRequiredByCurrentTuple;
 
     public NoWriteFlushColumnMetadata(ARecordType datasetType, ARecordType 
metaType, int numPrimaryKeys,
             boolean metaContainsKeys, IColumnValuesWriterFactory 
columnWriterFactory,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> writers,
-            IFieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
-            Map<AbstractSchemaNestedNode, RunLengthIntArray> definitionLevels, 
ArrayBackedValueStorage schemaStorage) {
+            int requiredTemporaryBuffers, IFieldNamesDictionary 
fieldNamesDictionary, ObjectSchemaNode root,
+            ObjectSchemaNode metaRoot, Map<AbstractSchemaNestedNode, 
RunLengthIntArray> definitionLevels,
+            ArrayBackedValueStorage schemaStorage) {
         super(datasetType, metaType, numPrimaryKeys, metaContainsKeys, 
columnWriterFactory, multiPageOpRef, writers,
-                fieldNamesDictionary, root, metaRoot, definitionLevels, 
schemaStorage);
+                requiredTemporaryBuffers, fieldNamesDictionary, root, 
metaRoot, definitionLevels, schemaStorage);
+        buffersRequiredByCurrentTuple = 0;
         numColumns = 0;
     }
 
@@ -80,7 +90,7 @@ public final class NoWriteFlushColumnMetadata extends 
FlushColumnMetadata {
 
         //ColumnWriter
         List<IColumnValuesWriter> writers = new ArrayList<>();
-        deserializeWriters(input, writers, columnWriterFactory);
+        int requiredTemporaryBuffers = deserializeWriters(input, writers, 
columnWriterFactory);
 
         //FieldNames
         IFieldNamesDictionary fieldNamesDictionary = 
AbstractFieldNamesDictionary.deserialize(input);
@@ -96,8 +106,8 @@ public final class NoWriteFlushColumnMetadata extends 
FlushColumnMetadata {
         ArrayBackedValueStorage schemaStorage = new 
ArrayBackedValueStorage(serializedMetadata.getLength());
         schemaStorage.append(serializedMetadata);
         return new NoWriteFlushColumnMetadata(datasetType, metaType, 
numPrimaryKeys, metaContainsKeys,
-                columnWriterFactory, multiPageOpRef, writers, 
fieldNamesDictionary, root, metaRoot, definitionLevels,
-                schemaStorage);
+                columnWriterFactory, multiPageOpRef, writers, 
requiredTemporaryBuffers, fieldNamesDictionary, root,
+                metaRoot, definitionLevels, schemaStorage);
     }
 
     public void close() {
@@ -186,6 +196,8 @@ public final class NoWriteFlushColumnMetadata extends 
FlushColumnMetadata {
                 if (columnIndex == numColumns) {
                     numColumns++;
                 }
+                boolean filtered = !primaryKey;
+                buffersRequiredByCurrentTuple += 
getBuffersRequiredByWriter(normalizedTypeTag, filtered);
                 IColumnValuesWriter writer = NoOpColumnValuesWriter.INSTANCE;
                 addColumn(columnIndex, writer);
                 return new PrimitiveSchemaNode(columnIndex, normalizedTypeTag, 
primaryKey);
@@ -195,6 +207,27 @@ public final class NoWriteFlushColumnMetadata extends 
FlushColumnMetadata {
         }
     }
 
+    private int getBuffersRequiredByWriter(ATypeTag typeTag, boolean filtered) 
{
+        return switch (typeTag) {
+            case MISSING, NULL -> 
NullMissingColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case BOOLEAN -> 
BooleanColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case TINYINT, SMALLINT, INTEGER, BIGINT -> 
LongColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case FLOAT -> 
FloatColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case DOUBLE -> 
DoubleColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case STRING -> 
StringColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            case UUID -> 
UUIDColumnValuesWriter.requiredTemporaryBuffers(filtered);
+            default -> throw new UnsupportedOperationException(typeTag + " is 
not supported");
+        };
+    }
+
+    public void resetBufferCount() {
+        buffersRequiredByCurrentTuple = 0;
+    }
+
+    public int getRequiredTemporaryBuffersCountForCurrentTuple() {
+        return buffersRequiredByCurrentTuple;
+    }
+
     @Override
     protected AbstractSchemaNode 
addDefinitionLevelsAndGet(AbstractSchemaNestedNode nestedNode) {
         return nestedNode;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 2c7ae4d9bf..0ae1aea462 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -351,4 +351,13 @@ public class MergeColumnTupleWriter extends 
AbstractColumnTupleWriter {
 
     public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) 
throws HyracksDataException {
     }
+
+    @Override
+    public void resetTemporaryBufferForCurrentTuple() {
+
+    }
+
+    public int getRequiredTemporaryBuffersCountIncludingCurrentTuple() {
+        return columnMetadata.getRequiredTemporaryBuffersCount();
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
index 9860822ec2..c02909cb5f 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -48,6 +48,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 public final class MergeColumnWriteMetadata extends 
AbstractColumnImmutableMetadata {
     private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
     private final List<IColumnValuesWriter> columnWriters;
+    private final int requiredTemporaryBuffersCount;
     private final List<IColumnTupleIterator> componentsTuples;
 
     /**
@@ -55,10 +56,12 @@ public final class MergeColumnWriteMetadata extends 
AbstractColumnImmutableMetad
      */
     private MergeColumnWriteMetadata(ARecordType datasetType, ARecordType 
metaType, int numberOfPrimaryKeys,
             Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
List<IColumnValuesWriter> columnWriters,
-            IValueReference serializedMetadata, List<IColumnTupleIterator> 
componentsTuples) {
+            int requiredTemporaryBuffersCount, IValueReference 
serializedMetadata,
+            List<IColumnTupleIterator> componentsTuples) {
         super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, 
columnWriters.size());
         this.multiPageOpRef = multiPageOpRef;
         this.columnWriters = columnWriters;
+        this.requiredTemporaryBuffersCount = requiredTemporaryBuffersCount;
         this.componentsTuples = componentsTuples;
     }
 
@@ -103,10 +106,14 @@ public final class MergeColumnWriteMetadata extends 
AbstractColumnImmutableMetad
 
         IColumnValuesWriterFactory writerFactory = new 
ColumnValuesWriterFactory(multiPageOpRef);
         List<IColumnValuesWriter> writers = new ArrayList<>();
-        FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);
+        int requiredTemporaryBuffers = 
FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);
 
         return new MergeColumnWriteMetadata(datasetType, metaType, 
numberOfPrimaryKeys, multiPageOpRef, writers,
-                serializedMetadata, componentsTuples);
+                requiredTemporaryBuffers, serializedMetadata, 
componentsTuples);
+    }
+
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffersCount;
     }
 
     public List<IColumnTupleIterator> getComponentsTuples() {
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
index cb14d5115d..446719a57f 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -134,4 +134,9 @@ public interface IColumnValuesWriter extends IValuesWriter {
      * @param output destination to which the writer should be serialized to
      */
     void serialize(DataOutput output) throws IOException;
+
+    /**
+     * Get the number of temporary buffers needed by this writer
+     */
+    int getRequiredTemporaryBuffersCount();
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
index accf4a0c03..045b3ede20 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
@@ -46,6 +46,7 @@ public abstract class AbstractColumnValuesWriter implements 
IColumnValuesWriter
     protected final AbstractColumnFilterWriter filterWriter;
     protected final ParquetRunLengthBitPackingHybridEncoder definitionLevels;
     protected final int level;
+    protected final boolean filtered;
 
     private final int columnIndex;
     private final boolean collection;
@@ -60,6 +61,7 @@ public abstract class AbstractColumnValuesWriter implements 
IColumnValuesWriter
         nullBitMask = ColumnValuesUtil.getNullMask(level);
         int width = ColumnValuesUtil.getBitWidth(level);
         definitionLevels = new ParquetRunLengthBitPackingHybridEncoder(width);
+        this.filtered = filtered;
         this.filterWriter = filtered ? createFilter() : 
NoOpColumnFilterWriter.INSTANCE;
     }
 
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
index 7d50cb16e1..b71f802def 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
@@ -89,4 +89,13 @@ public final class BooleanColumnValuesWriter extends 
AbstractColumnValuesWriter
     protected ATypeTag getTypeTag() {
         return ATypeTag.BOOLEAN;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return 0;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index 9e6f90656f..bf2daabd7d 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -117,4 +117,13 @@ public final class DoubleColumnValuesWriter extends 
AbstractColumnValuesWriter {
     protected ATypeTag getTypeTag() {
         return ATypeTag.DOUBLE;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
index 39abcad244..1845c18ed8 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/FloatColumnValuesWriter.java
@@ -117,4 +117,13 @@ public final class FloatColumnValuesWriter extends 
AbstractColumnValuesWriter {
     protected ATypeTag getTypeTag() {
         return ATypeTag.FLOAT;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index 516f56d634..1efa16475f 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.data.std.primitive.ShortPointable;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.parquet.bytes.BytesInput;
 
-final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
+public final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
     private final AbstractParquetValuesWriter longWriter;
     private final ATypeTag typeTag;
 
@@ -119,4 +119,17 @@ final class LongColumnValuesWriter extends 
AbstractColumnValuesWriter {
     protected ATypeTag getTypeTag() {
         return typeTag;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        if (filtered) {
+            return 
ParquetDeltaBinaryPackingValuesWriterForLong.REQUIRED_TEMPORARY_BUFFERS;
+        } else {
+            return 
ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java
index e774e9c00f..cd1ce6ba96 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NoOpColumnValuesWriter.java
@@ -121,4 +121,13 @@ public class NoOpColumnValuesWriter implements 
IColumnValuesWriter {
     public void serialize(DataOutput output) throws IOException {
 
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(false);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return 0;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
index 2d9f5bf152..6385756140 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
@@ -104,4 +104,13 @@ public class NullMissingColumnValuesWriter extends 
AbstractColumnValuesWriter {
     public RunLengthIntArray getDefinitionLevelsIntArray() {
         return defLevelsIntArray;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(false);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return 0;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index 5b1977f5e9..6d1ce0adcc 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -103,4 +103,16 @@ public class StringColumnValuesWriter extends 
AbstractColumnValuesWriter {
         return ATypeTag.STRING;
     }
 
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        if (filtered) {
+            return ParquetDeltaByteArrayWriter.REQUIRED_TEMPORARY_BUFFERS;
+        } else {
+            return 
ParquetPlainVariableLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 9d4ff9a523..1eafe288a1 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -25,7 +25,7 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.commons.lang3.mutable.Mutable;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 
-final class UUIDColumnValuesWriter extends StringColumnValuesWriter {
+public final class UUIDColumnValuesWriter extends StringColumnValuesWriter {
 
     public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
@@ -42,4 +42,13 @@ final class UUIDColumnValuesWriter extends 
StringColumnValuesWriter {
     protected ATypeTag getTypeTag() {
         return ATypeTag.UUID;
     }
+
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return requiredTemporaryBuffers(filtered);
+    }
+
+    public static int requiredTemporaryBuffers(boolean filtered) {
+        return ParquetPlainFixedLengthValuesWriter.REQUIRED_TEMPORARY_BUFFERS;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
index d287417f48..51a515bf12 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/DummyColumnValuesWriter.java
@@ -148,6 +148,11 @@ public class DummyColumnValuesWriter implements 
IColumnValuesWriter {
 
     }
 
+    @Override
+    public int getRequiredTemporaryBuffersCount() {
+        return 0;
+    }
+
     @Override
     public RunLengthIntArray getDefinitionLevelsIntArray() {
         return definitionLevels;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index 435d8c2596..c01d6f983e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -27,8 +27,8 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNI
 import static 
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
-import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedDoubleType;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedLongByteUnit;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
 
@@ -76,7 +76,14 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_PARTITIONS_COUNT(INTEGER, 8),
         STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE)),
         STORAGE_COLUMN_BUFFER_SIZE(INTEGER_BYTE_UNIT, 
StorageUtil.getIntSizeInBytes(4, KILOBYTE)),
-        STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE(getRangedDoubleType(0.0, 
100.0), 3.0d),
+        STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY(
+                getRangedLongByteUnit(0, MAX_HEAP_BYTES),
+                (Function<IApplicationConfig, Long>) config -> {
+                    if (MAX_HEAP_BYTES < SMALL_JVM_HEAP_SIZE) {
+                        return StorageUtil.getLongSizeInBytes(100, 
StorageUtil.StorageUnit.MEGABYTE);
+                    }
+                    return (long) (0.03 * MAX_HEAP_BYTES);
+                }),
         STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE(getRangedIntegerType(1, 50000), 
8000),
         STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT(LONG, 
TimeUnit.SECONDS.toMillis(120));
 
@@ -88,6 +95,11 @@ public class StorageProperties extends AbstractProperties {
             this.defaultValue = defaultValue;
         }
 
+        <T> Option(IOptionType<T> interpreter, Function<IApplicationConfig, T> 
defaultOption) {
+            this.interpreter = interpreter;
+            this.defaultValue = defaultOption;
+        }
+
         @Override
         public Section section() {
             switch (this) {
@@ -102,10 +114,6 @@ public class StorageProperties extends AbstractProperties {
                 case STORAGE_COLUMN_MAX_TUPLE_COUNT:
                 case STORAGE_COLUMN_FREE_SPACE_TOLERANCE:
                 case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE:
-                case STORAGE_COLUMN_BUFFER_SIZE:
-                case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE:
-                case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE:
-                case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -176,7 +184,7 @@ public class StorageProperties extends AbstractProperties {
                     return "The resultant disk component after a merge must 
not exceed the specified maximum size.";
                 case STORAGE_COLUMN_BUFFER_SIZE:
                     return "The size in bytes of each buffer in the column 
buffer pool.";
-                case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE:
+                case STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY:
                     return "The percentage of the JVM memory allocated to the 
column buffer pool. This controls the number of the column buffer relative to 
the total JVM memory.";
                 case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE:
                     return "The maximum number of buffers in the column buffer 
pool.";
@@ -209,6 +217,7 @@ public class StorageProperties extends AbstractProperties {
     }
 
     public static final long MAX_HEAP_BYTES = Runtime.getRuntime().maxMemory();
+    public static final long SMALL_JVM_HEAP_SIZE = StorageUtil.getByteValue("8 
GB");
     private static final int SYSTEM_RESERVED_DATASETS = 0;
 
     public StorageProperties(PropertiesAccessor accessor) {
@@ -256,8 +265,8 @@ public class StorageProperties extends AbstractProperties {
         return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_SIZE);
     }
 
-    public double getColumnBufferPoolMemoryPercentage() {
-        return 
accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE);
+    public long getColumnBufferPoolMaxMemory() {
+        return accessor.getLong(Option.STORAGE_COLUMN_BUFFER_POOL_MAX_MEMORY);
     }
 
     public int getColumnBufferPoolMaxSize() {
@@ -284,10 +293,8 @@ public class StorageProperties extends AbstractProperties {
             int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
             int writeBufferSize = 
runtimeContext.getCloudProperties().getWriteBufferSize();
 
-            double columnBufferPoolSizePercentage =
-                    
accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE);
-            long maxMemoryForColumnBuffers = (long) (MAX_HEAP_BYTES * 
(columnBufferPoolSizePercentage / 100));
-            jobExecutionMemory -= maxMemoryForColumnBuffers;
+            long columnBufferPoolMaxMemory = getColumnBufferPoolMaxMemory();
+            jobExecutionMemory -= columnBufferPoolMaxMemory;
             jobExecutionMemory -= (long) (maxConcurrentFlushes + 
maxConcurrentMerges) * writeBufferSize;
 
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index 08a1dec947..a367324395 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -69,7 +69,8 @@ public class RuntimeContext {
         this.ioManager = appCtx.getIoManager();
         bufferCache = new BufferCache(ioManager, prs, new 
DelayPageCleanerPolicy(1000), fileMapManager, 100, 10,
                 threadFactory, new HashMap<>(), 
DefaultBufferCacheReadContextProvider.DEFAULT);
-        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, 
TimeUnit.MINUTES.toMillis(2));
+        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, (long) (0.03 * 
Runtime.getRuntime().maxMemory()),
+                TimeUnit.MINUTES.toMillis(2));
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new 
TransientLocalResourceRepositoryFactory();
         localResourceRepository = 
localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new 
ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index fa1d9c4167..43d6ad696b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -77,6 +77,11 @@ public abstract class AbstractColumnTupleWriter extends 
AbstractTupleWriterDisab
      */
     public abstract int getPrimaryKeysEstimatedSize();
 
+    /**
+     * Reset the temporary buffer for the current tuple
+     */
+    public abstract void resetTemporaryBufferForCurrentTuple();
+
     /**
      * Writes the tuple into a temporary internal buffers
      *
@@ -125,4 +130,5 @@ public abstract class AbstractColumnTupleWriter extends 
AbstractTupleWriterDisab
         }
     }
 
+    public abstract int 
getRequiredTemporaryBuffersCountIncludingCurrentTuple();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 0769824425..916b48c420 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -119,14 +119,11 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
             writeFullLeafPage();
             confiscateNewLeafPage();
         }
-        if (tupleCount == 0) {
-            //Since we are writing the first tuple, we need to estimate the 
number of columns.
-            columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        }
         //Save the key of the last inserted tuple
         setMinMaxKeys(tuple);
         columnWriter.writeTuple(tuple);
         tupleCount++;
+        columnWriter.resetTemporaryBufferForCurrentTuple();
     }
 
     /**
@@ -136,9 +133,9 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
     private void ensureWritersInitialized() throws HyracksDataException {
         if (!writerInitialized) {
             writerInitialized = true;
-            int numberOfColumns = 
columnWriter.getAbsoluteNumberOfColumns(true);
-            reserveBuffers(numberOfColumns);
-            reservedBufferCount = numberOfColumns;
+            int requiredBuffers = 
columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple();
+            reserveBuffers(requiredBuffers);
+            reservedBufferCount = requiredBuffers;
             columnWriter.init(this);
         }
     }
@@ -183,8 +180,8 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
             return true;
         }
         if (canCurrentTupleFit(tuple)) {
-            int requiredColumns = 
columnWriter.getAbsoluteNumberOfColumns(true);
-            return !ensureSufficientBuffers(requiredColumns);
+            int requiredBuffers = 
columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple();
+            return !ensureSufficientBuffers(requiredBuffers);
         }
         // Not enough space for the tuple.
         return true;
@@ -239,8 +236,8 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
      * @return false, as the first tuple always fits after setup.
      */
     private boolean handleFirstTuple() throws HyracksDataException {
-        int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true);
-        int additionalBuffersNeeded = requiredColumns - reservedBufferCount;
+        int requiredBuffers = 
columnWriter.getRequiredTemporaryBuffersCountIncludingCurrentTuple();
+        int additionalBuffersNeeded = requiredBuffers - reservedBufferCount;
         if (additionalBuffersNeeded > 0) {
             reserveBuffers(additionalBuffersNeeded);
             reservedBufferCount += additionalBuffersNeeded;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
index 5a15f12978..35c484d8cd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
@@ -25,9 +25,9 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -36,20 +36,21 @@ import org.apache.logging.log4j.Logger;
  *
  * NOTE: getBuffer() and recycle() are called very frequently and are designed 
to be as lightweight as possible.
  */
-public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent {
+public class ColumnBufferPool implements IColumnBufferPool {
     protected static final Logger LOGGER = LogManager.getLogger();
 
     private final BlockingQueue<ByteBuffer> bufferPool;
     private final AtomicLong totalAllocatedMemoryInBytes;
     private final AtomicLong totalPooledMemoryInBytes;
-    private final AtomicLong numAllocNew;
+    private final AtomicInteger numAllocNew;
+    private final AtomicInteger buffersAllocated;
 
     // Semaphore for buffer-based allocation
     private final Semaphore bufferSemaphore;
-    private final long maxBufferPoolMemoryLimit;
-    private final double columnBufferPoolMemoryPercentage;
+    private final long bufferPoolMemoryLimit;
+    private final long columnBufferPoolMaxMemory;
     private final int columnBufferInBytes;
-    private final int columnBufferPoolMaxSize;
+    private final int columnBufferPoolCapacity;
     private final int maxBuffers;
 
     // Timeout for buffer reservation in milliseconds
@@ -58,45 +59,43 @@ public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent
     /**
      * @param columnBufferInBytes buffer size in bytes
      * @param columnBufferPoolMaxSize max number of buffers in pool
-     * @param columnBufferPoolMemoryPercentage max percentage of total memory 
for pool
+     * @param columnBufferPoolMaxMemory max memory for pool
      * @param reserveTimeoutMillis timeout in milliseconds for buffer 
reservation
      */
-    public ColumnBufferPool(int columnBufferInBytes, int 
columnBufferPoolMaxSize,
-            double columnBufferPoolMemoryPercentage, long 
reserveTimeoutMillis) {
+    public ColumnBufferPool(int columnBufferInBytes, int 
columnBufferPoolMaxSize, long columnBufferPoolMaxMemory,
+            long reserveTimeoutMillis) {
         this.totalAllocatedMemoryInBytes = new AtomicLong(0);
-        this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolMaxSize);
-        this.columnBufferPoolMemoryPercentage = 
columnBufferPoolMemoryPercentage;
-        this.columnBufferPoolMaxSize = columnBufferPoolMaxSize;
+        this.columnBufferPoolMaxMemory = columnBufferPoolMaxMemory;
         this.reserveTimeoutMillis = reserveTimeoutMillis;
-        this.maxBufferPoolMemoryLimit = 
getMaxBufferPoolMemoryLimit(columnBufferInBytes,
-                columnBufferPoolMemoryPercentage, columnBufferPoolMaxSize);
-        this.maxBuffers = (int) (maxBufferPoolMemoryLimit / 
columnBufferInBytes);
+        this.bufferPoolMemoryLimit =
+                getMaxBufferPoolMemoryLimit(columnBufferInBytes, 
columnBufferPoolMaxMemory, columnBufferPoolMaxSize);
+        this.maxBuffers = (int) (bufferPoolMemoryLimit / columnBufferInBytes);
         this.bufferSemaphore = new Semaphore(maxBuffers, true);
         this.columnBufferInBytes = columnBufferInBytes;
         this.totalPooledMemoryInBytes = new AtomicLong(0);
-        this.numAllocNew = new AtomicLong(0);
-        initializePool();
+        this.buffersAllocated = new AtomicInteger(0);
+        this.numAllocNew = new AtomicInteger(0);
+        this.columnBufferPoolCapacity = Math.min(maxBuffers, 
columnBufferPoolMaxSize);
+        this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolCapacity);
+        initializePool(columnBufferPoolCapacity);
         LOGGER.info(
-                "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, 
maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferInBytes={}, 
reserveTimeoutMillis={}",
-                columnBufferPoolMaxSize, maxBufferPoolMemoryLimit, maxBuffers, 
columnBufferInBytes,
-                reserveTimeoutMillis);
+                "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, 
maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferPoolCapacity={}, 
columnBufferInBytes={}, reserveTimeoutMillis={}",
+                columnBufferPoolMaxSize, columnBufferPoolMaxMemory, 
maxBuffers, columnBufferPoolCapacity,
+                columnBufferInBytes, reserveTimeoutMillis);
     }
 
     /**
      * Pre-allocate buffers for fast buffer access.
      * The number of buffers pre-allocated is the minimum of half the pool 
size and the memory limit.
      */
-    private void initializePool() {
-        int halfPoolSize = columnBufferPoolMaxSize / 2;
-        int numBuffersToAllocate = Math.min(halfPoolSize, maxBuffers);
-
-        for (int i = 0; i < numBuffersToAllocate; i++) {
+    private void initializePool(int bufferPoolCapacity) {
+        for (int i = 0; i < bufferPoolCapacity / 2; i++) {
             ByteBuffer buffer = ByteBuffer.allocate(columnBufferInBytes);
             bufferPool.add(buffer);
             totalPooledMemoryInBytes.addAndGet(columnBufferInBytes);
         }
-        LOGGER.info("ColumnBufferPool pre-allocated {} buffers ({} bytes 
each)", numBuffersToAllocate,
-                columnBufferInBytes);
+        LOGGER.info("ColumnBufferPool pre-allocated {} buffers having 
bufferPoolSize {} ({} bytes each)",
+                bufferPoolCapacity / 2, bufferPoolCapacity, 
columnBufferInBytes);
     }
 
     /**
@@ -112,8 +111,15 @@ public class ColumnBufferPool implements 
IColumnBufferPool, ILifeCycleComponent
         }
         boolean acquired = bufferSemaphore.tryAcquire(requestedBuffers, 
reserveTimeoutMillis, TimeUnit.MILLISECONDS);
         if (!acquired) {
-            LOGGER.error("Failed to reserve {} buffers within {} ms ({} sec)", 
requestedBuffers, reserveTimeoutMillis,
-                    TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis));
+            LOGGER.error(
+                    "Failed to reserve {} buffers within {} ms ({} sec); "
+                            + "totalAllocatedMemoryInBytes={}, 
maxBufferPoolMemoryLimit={}, bufferPoolCapacity={}, columnBufferInBytes={}, "
+                            + "columnBufferPoolMaxMemory={}, numAllocNew={}, 
availableBuffers={}. "
+                            + "Consider increasing 
storageColumnBufferPoolMaxMemory (current: {}).",
+                    requestedBuffers, reserveTimeoutMillis, 
TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis),
+                    totalAllocatedMemoryInBytes.get(), bufferPoolMemoryLimit, 
columnBufferPoolCapacity,
+                    columnBufferInBytes, columnBufferPoolMaxMemory, 
numAllocNew.get(),
+                    bufferSemaphore.availablePermits(), 
columnBufferPoolMaxMemory);
             throw new IllegalStateException("Timeout while reserving column 
buffers (" + requestedBuffers + ") after "
                     + reserveTimeoutMillis + " ms");
         }
@@ -140,6 +146,7 @@ public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent
      */
     @Override
     public ByteBuffer getBuffer() {
+        buffersAllocated.incrementAndGet();
         // Fast path: try to poll from pool
         ByteBuffer buffer = bufferPool.poll();
         if (buffer != null) {
@@ -165,7 +172,7 @@ public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent
         if (buffer == null) {
             throw new IllegalStateException("buffer is null");
         }
-
+        buffersAllocated.decrementAndGet();
         // Try to return to pool; if full, discard
         if (bufferPool.offer(buffer)) {
             totalPooledMemoryInBytes.addAndGet(columnBufferInBytes);
@@ -184,28 +191,26 @@ public class ColumnBufferPool implements 
IColumnBufferPool, ILifeCycleComponent
      * Throws if the limit would be exceeded.
      */
     private void ensureAvailableQuota() {
-        long spaceAcquiredByPool = (long) columnBufferPoolMaxSize * 
columnBufferInBytes;
-        long totalAllocated = totalAllocatedMemoryInBytes.get();
-        long totalIfAllocated = totalAllocated + spaceAcquiredByPool;
-        if (totalIfAllocated > maxBufferPoolMemoryLimit) {
+        if (buffersAllocated.get() > maxBuffers) {
+            long spaceAcquiredByPool = (long) columnBufferPoolCapacity * 
columnBufferInBytes;
+            long totalAllocated = totalAllocatedMemoryInBytes.get();
+            long totalIfAllocated = totalAllocated + spaceAcquiredByPool;
             LOGGER.error(
                     "Cannot allocate more buffers, memory limit reached. "
-                            + "totalAllocatedMemoryInBytes={}, 
maxBufferPoolMemoryLimit={}, columnBufferPoolMaxSize={}, 
columnBufferInBytes={}, "
-                            + "columnBufferPoolMemoryPercentage={}, 
totalIfAllocated={}, numAllocNew={}, availableBuffers={}. "
-                            + "Consider increasing 
storageColumnBufferPoolMemoryPercentage (current: {}).",
-                    totalAllocated, maxBufferPoolMemoryLimit, 
columnBufferPoolMaxSize, columnBufferInBytes,
-                    columnBufferPoolMemoryPercentage, totalIfAllocated, 
numAllocNew.get(),
-                    bufferSemaphore.availablePermits(), 
columnBufferPoolMemoryPercentage);
-            throw new IllegalStateException("Cannot allocate more buffers, 
maxBufferPoolMemoryLimit ("
-                    + maxBufferPoolMemoryLimit + ") reached.");
+                            + "totalAllocatedMemoryInBytes={}, 
maxBufferPoolMemoryLimit={}, columnBufferPoolCapacity={}, 
columnBufferInBytes={}, "
+                            + "columnBufferPoolMaxMemory={}, 
totalIfAllocated={}, numAllocNew={}, availableBuffers={}. "
+                            + "Consider increasing 
storageColumnBufferPoolMaxMemory (current: {}).",
+                    totalAllocated, bufferPoolMemoryLimit, 
columnBufferPoolCapacity, columnBufferInBytes,
+                    columnBufferPoolMaxMemory, totalIfAllocated, 
numAllocNew.get(), bufferSemaphore.availablePermits(),
+                    columnBufferPoolMaxMemory);
+            throw new IllegalStateException(
+                    "Cannot allocate more buffers, maxBufferPoolMemoryLimit (" 
+ bufferPoolMemoryLimit + ") reached.");
         }
     }
 
-    private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, double 
columnBufferPoolMemoryPercentage,
+    private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, long 
columnBufferPoolMaxMemoryLimit,
             int columnBufferPoolMaxSize) {
-        long totalMemory = Runtime.getRuntime().totalMemory();
-        return (long) Math.max(totalMemory * (columnBufferPoolMemoryPercentage 
/ 100),
-                columnBufferInBytes * columnBufferPoolMaxSize);
+        return Math.max(columnBufferPoolMaxMemoryLimit, (long) 
columnBufferInBytes * columnBufferPoolMaxSize);
     }
 
     @Override
@@ -225,7 +230,7 @@ public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent
     public void dumpState(OutputStream os) throws IOException {
         long pooledBytes = totalPooledMemoryInBytes.get();
         long totalAllocatedBytes = totalAllocatedMemoryInBytes.get();
-        String buffer = "ColumnBufferPool State:\n" + 
"columnBufferPoolMaxSize: " + columnBufferPoolMaxSize + "\n"
+        String buffer = "ColumnBufferPool State:\n" + 
"columnBufferPoolCapacity: " + columnBufferPoolCapacity + "\n"
                 + "Max Buffers: " + maxBuffers + "\n" + "Total Pooled Memory 
(bytes): " + pooledBytes + "\n"
                 + "Total Allocated Memory (bytes): " + totalAllocatedBytes + 
"\n" + "Number of Buffers Allocated New: "
                 + numAllocNew.get() + "\n" + "Available Buffers: " + 
bufferSemaphore.availablePermits();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
index e47478f4b0..43593f1e31 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.storage.common.buffercache;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.util.StorageUtil;
@@ -59,4 +61,19 @@ public class FreeColumnBufferPool implements 
IColumnBufferPool {
     public void close() {
 
     }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws 
IOException {
+
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
index 9fc3027799..4040e5e1f9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.storage.common.buffercache;
 
 import java.nio.ByteBuffer;
 
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+
 /**
  * Interface for a buffer-based pool managing column buffer allocation.
  *
@@ -29,7 +31,7 @@ import java.nio.ByteBuffer;
  * - Buffers are acquired via getBuffer() and returned via recycle().
  * - The pool may block or fail to allocate if semaphore permits are exhausted.
  */
-public interface IColumnBufferPool {
+public interface IColumnBufferPool extends ILifeCycleComponent {
 
     /**
      * Reserve the specified number of buffers, blocking if necessary until 
available.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 5788823a94..cdc36a3dd8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -159,7 +159,8 @@ public class TestStorageManagerComponentHolder {
             return columnBufferPool;
         }
 
-        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, 
TimeUnit.MINUTES.toMillis(2));
+        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, (long) (0.03 * 
Runtime.getRuntime().maxMemory()),
+                TimeUnit.MINUTES.toMillis(2));
         return columnBufferPool;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
index 0428b48e06..de95944197 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
@@ -42,13 +42,13 @@ public class ColumnBufferPoolTest {
     private ColumnBufferPool columnBufferPool;
     private static final int COLUMN_BUFFER_GRANULE_BYTES = 4096; // 4KB
     private static final int POOL_SIZE = 10;
-    private static final double CAPPED_PERCENTAGE = 10.0; // 10% of runtime 
memory
+    private static final long COLUMN_BUFFER_MAX_MEMORY_LIMIT = 
getColumnBufferPoolMaxMemoryLimit(10); // 10% of runtime memory
     private static final long RESERVE_TIMEOUT_MILLIS = 5000; // 5 seconds 
timeout
 
     @Before
     public void setUp() {
-        columnBufferPool =
-                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, RESERVE_TIMEOUT_MILLIS);
+        columnBufferPool = new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, 
POOL_SIZE, COLUMN_BUFFER_MAX_MEMORY_LIMIT,
+                RESERVE_TIMEOUT_MILLIS);
     }
 
     @After
@@ -103,7 +103,7 @@ public class ColumnBufferPoolTest {
     public void testReserveTimeout() throws InterruptedException {
         // Use a shorter timeout for this test
         ColumnBufferPool shortTimeoutPool =
-                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, 100); // 100ms timeout
+                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
COLUMN_BUFFER_MAX_MEMORY_LIMIT, 100); // 100ms timeout
         try {
             // Reserve all credits
             int maxCredits = shortTimeoutPool.getMaxReservedBuffers();
@@ -208,7 +208,7 @@ public class ColumnBufferPoolTest {
     public void testMultipleThreadsConcurrentExecution() throws 
InterruptedException {
         // Use shorter timeout to make timeouts happen faster in test
         ColumnBufferPool testPool =
-                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, 500); // 500ms timeout
+                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
COLUMN_BUFFER_MAX_MEMORY_LIMIT, 500); // 500ms timeout
 
         // Helper for timestamped log
         java.util.function.Consumer<String> logWithTimestamp = msg -> {
@@ -475,7 +475,7 @@ public class ColumnBufferPoolTest {
     @Test(expected = IllegalStateException.class)
     public void testMemoryQuotaEnforcement() throws InterruptedException {
         // Create a small pool to test quota enforcement
-        ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, 0.001, 
1000); // Very small percentage
+        ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, 
getColumnBufferPoolMaxMemoryLimit(0.001), 1000); // Very small percentage
         try {
             int maxCredits = smallPool.getMaxReservedBuffers();
             smallPool.reserve(maxCredits);
@@ -511,4 +511,8 @@ public class ColumnBufferPoolTest {
     private int getMaxCredits() {
         return columnBufferPool.getMaxReservedBuffers();
     }
+
+    private static long getColumnBufferPoolMaxMemoryLimit(double percentage) {
+        return (long) ((percentage / 100) * Runtime.getRuntime().maxMemory());
+    }
 }

Reply via email to