This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 43137d598aeddc42de33531f4556ae8950970bf4
Author: Ritik Raj <[email protected]>
AuthorDate: Wed Aug 20 21:48:39 2025 +0530

    [ASTERIXDB-3636][STO] Add ColumnBufferPool to prevent OOM during 
high-volume column ingestion
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: yes
    
    details:
    
    Introduce a semaphore-based buffer pool to manage configurable
    column buffers(default: 4KB) and prevent excessive memory
    allocation during high ingestion rates or when processing
    datasets with large numbers of columns.
    
    - Implement reservation-based flow control using Semaphore for buffer 
allocation
    - Add buffer recycling to reuse existing buffers instead of creating new 
ones
    - Configure memory limits based on percentage of total JVM memory
    - Include timeout mechanisms for buffer reservation to prevent deadlocks
    - Add logging for buffer pool health
    - Prevent JVM OOM and reduce GC overhead from excessive buffer creation
    
    Without pooling, each column's zeroth buffer (32KB) would be allocated
    independently, leading to memory exhaustion and GC pressure in scenarios
    with many columns or high ingestion throughput.
    
    Ext-ref: MB-68059
    Change-Id: I62437839d89b11d950e6715a00e844aedd0dab8e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20249
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  22 +
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   4 +
 .../cluster_state_1_full.1.regexadm                |   4 +
 .../cluster_state_1_less.1.regexadm                |   4 +
 .../out/AbstractMultiBufferBytesOutputStream.java  |  10 +-
 .../out/MultiTemporaryBufferBytesOutputStream.java |  57 ++-
 .../operation/lsm/flush/FlushColumnMetadata.java   |   6 +-
 .../lsm/flush/FlushColumnTupleWriter.java          |   6 +
 .../lsm/merge/MergeColumnTupleWriter.java          |   6 +
 .../lsm/merge/MergeColumnWriteMetadata.java        |   2 +-
 .../column/common/buffer/NoOpWriteMultiPageOp.java |  10 +
 .../column/common/buffer/TestWriteMultiPageOp.java |  10 +
 .../asterix/common/api/INcApplicationContext.java  |   3 +
 .../asterix/common/config/StorageProperties.java   |  42 +-
 .../runtime/utils/RuntimeComponentsProvider.java   |   6 +
 .../hyracks/control/common/config/OptionTypes.java |  94 +++-
 .../btree/helper/BTreeHelperStorageManager.java    |   6 +
 .../examples/btree/helper/RuntimeContext.java      |   9 +
 .../column/api/AbstractColumnTupleWriter.java      |   5 +
 .../btree/column/api/IColumnWriteMultiPageOp.java  |  11 +
 .../dataflow/LSMColumnBTreeLocalResource.java      |   5 +-
 .../lsm/btree/column/impls/btree/ColumnBTree.java  |  10 +-
 .../column/impls/btree/ColumnBTreeBulkloader.java  | 315 +++++++++----
 .../column/impls/btree/ColumnBTreeFactory.java     |  10 +-
 .../lsm/btree/column/impls/lsm/LSMColumnBTree.java |  22 +-
 .../lsm/btree/column/utils/LSMColumnBTreeUtil.java |  33 +-
 .../hyracks/storage/common/IStorageManager.java    |   6 +
 .../common/buffercache/ColumnBufferPool.java       | 243 ++++++++++
 .../common/buffercache/FreeColumnBufferPool.java   |  36 +-
 .../common/buffercache/IColumnBufferPool.java      |  78 ++++
 .../hyracks/test/support/TestStorageManager.java   |   6 +
 .../support/TestStorageManagerComponentHolder.java |  21 +
 .../storage/common/ColumnBufferPoolTest.java       | 514 +++++++++++++++++++++
 33 files changed, 1448 insertions(+), 168 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 6416df9245..9d80525f20 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -117,11 +117,14 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.GreedyScheduler;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import 
org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool;
 import 
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
+import org.apache.hyracks.storage.common.buffercache.FreeColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.IDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.IPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
@@ -145,6 +148,7 @@ import org.apache.logging.log4j.Logger;
 
 public class NCAppRuntimeContext implements INcApplicationContext {
     private static final Logger LOGGER = LogManager.getLogger();
+    public static final double INVALID_BUFFER_POOL_MEMORY_PERCENTAGE = 0.0;
 
     private ILSMMergePolicyFactory metadataMergePolicyFactory;
     private final INCServiceContext ncServiceContext;
@@ -162,6 +166,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private ExecutorService threadExecutor;
     private IDatasetLifecycleManager datasetLifecycleManager;
     private IBufferCache bufferCache;
+    private IColumnBufferPool columnBufferPool;
     private IVirtualBufferCache virtualBufferCache;
     private ITransactionSubsystem txnSubsystem;
     private IMetadataNode metadataNodeStub;
@@ -325,6 +330,17 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                     fileInfoMap, defaultContext);
         }
 
+        if (storageProperties.getColumnBufferPoolMemoryPercentage() <= 
INVALID_BUFFER_POOL_MEMORY_PERCENTAGE) {
+            LOGGER.info("Using FreeColumnBufferPool since column buffer pool 
size percentage is {}",
+                    storageProperties.getColumnBufferSize());
+            this.columnBufferPool = new FreeColumnBufferPool();
+        } else {
+            this.columnBufferPool = new 
ColumnBufferPool(storageProperties.getColumnBufferSize(),
+                    storageProperties.getColumnBufferPoolMaxSize(),
+                    storageProperties.getColumnBufferPoolMemoryPercentage(),
+                    storageProperties.getColumnBufferAcquireTimeout());
+        }
+
         if (cloudConfigurator != null) {
             diskCacheService =
                     
cloudConfigurator.createDiskCacheMonitoringService(getServiceContext(), 
bufferCache, fileInfoMap);
@@ -347,6 +363,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         ILifeCycleComponentManager lccm = 
getServiceContext().getLifeCycleComponentManager();
         lccm.register((ILifeCycleComponent) virtualBufferCache);
         lccm.register((ILifeCycleComponent) bufferCache);
+        lccm.register((ILifeCycleComponent) columnBufferPool);
         /*
          * LogManager must be stopped after RecoveryManager, 
DatasetLifeCycleManager, and ReplicationManager
          * to process any logs that might be generated during stopping these 
components
@@ -404,6 +421,11 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         return bufferCache;
     }
 
+    @Override
+    public IColumnBufferPool getColumnBufferPool() {
+        return columnBufferPool;
+    }
+
     @Override
     public IVirtualBufferCache getVirtualBufferCache() {
         return virtualBufferCache;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index 6c78c21b42..24f084d814 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -90,6 +90,10 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
+    "storage.column.buffer.acquire.timeout" : 120000,
+    "storage.column.buffer.pool.max.size" : 8000,
+    "storage.column.buffer.pool.memory.percentage" : 3.0,
+    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index c44e600a3b..89aa04551b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -90,6 +90,10 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
+    "storage.column.buffer.acquire.timeout" : 120000,
+    "storage.column.buffer.pool.max.size" : 8000,
+    "storage.column.buffer.pool.memory.percentage" : 3.0,
+    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index 10eab67e07..1882daff53 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -90,6 +90,10 @@
     "replication\.timeout" : 120,
     "ssl\.enabled" : false,
     "storage.buffercache.pagesize" : 32768,
+    "storage.column.buffer.acquire.timeout" : 120000,
+    "storage.column.buffer.pool.max.size" : 8000,
+    "storage.column.buffer.pool.memory.percentage" : 3.0,
+    "storage.column.buffer.size" : 4096,
     "storage.column.free.space.tolerance" : 0.15,
     "storage.column.max.leaf.node.size" : 10485760,
     "storage.column.max.tuple.count" : 15000,
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
index 0c311f1433..33cb5d86f2 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
@@ -47,7 +47,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends 
AbstractBytesOutputS
     protected abstract void preReset() throws HyracksDataException;
 
     @Override
-    public final void reset() throws HyracksDataException {
+    public void reset() throws HyracksDataException {
         preReset();
         position = 0;
         currentBufferIndex = 0;
@@ -149,6 +149,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends 
AbstractBytesOutputS
     @Override
     public final void finish() {
         currentBuf = null;
+        releaseColumnBuffer();
         buffers.clear();
         allocatedBytes = 0;
     }
@@ -158,7 +159,7 @@ abstract class AbstractMultiBufferBytesOutputStream extends 
AbstractBytesOutputS
      * *************************************************
      */
 
-    private void ensureCapacity(int length) throws HyracksDataException {
+    protected void ensureCapacity(int length) throws HyracksDataException {
         if (position + length > allocatedBytes) {
             allocateMoreBuffers(length);
         } else if (length > 0) {
@@ -195,4 +196,9 @@ abstract class AbstractMultiBufferBytesOutputStream extends 
AbstractBytesOutputS
             allocateBuffer();
         }
     }
+
+    protected void releaseColumnBuffer() {
+        // No-op by default
+        // Overridden in MultiTemporaryBufferBytesOutputStream
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
index 8988026ab9..6642cd8ef0 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -25,15 +25,23 @@ import java.nio.ByteBuffer;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
-import org.apache.hyracks.util.StorageUtil;
 
 public final class MultiTemporaryBufferBytesOutputStream extends 
AbstractMultiBufferBytesOutputStream {
-    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE);
+    private boolean isFirstBufferFromColumnPool = false;
 
     public 
MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> 
multiPageOpRef) {
         super(multiPageOpRef);
     }
 
+    // Don't initialize for temporary buffers, as they are confiscated from 
the pool
+    @Override
+    public void reset() throws HyracksDataException {
+        preReset();
+        position = 0;
+        currentBufferIndex = 0;
+        releaseColumnBuffer();
+    }
+
     @Override
     protected void preReset() {
         //NoOp
@@ -41,21 +49,60 @@ public final class MultiTemporaryBufferBytesOutputStream 
extends AbstractMultiBu
 
     @Override
     protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
-        if (buffers.isEmpty()) {
+        // When a buffer reset occurs, the 0th buffer is set to null.
+        // If buffers are not empty, this means a buffer from the column pool 
was previously used.
+        // If the 0th buffer is null, a new buffer must be confiscated.
+        if (buffers.isEmpty() || buffers.get(0) == null) {
             /*
-             * One buffer on the house to avoid confiscating a whole page for 
a tiny stream.
+             * One buffer from the pool to avoid confiscating a whole page for 
a tiny stream.
              * This protects pressuring the buffer cache from confiscating 
pages for small columns. Think sparse
              * columns, which may take only a few hundreds of bytes to write.
              */
-            return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+            isFirstBufferFromColumnPool = true;
+            return multiPageOpRef.getValue().confiscateTemporary0thBuffer();
         }
         return multiPageOpRef.getValue().confiscateTemporary();
     }
 
+    @Override
+    protected void ensureCapacity(int length) throws HyracksDataException {
+        ensure0thBufferAllocated();
+        super.ensureCapacity(length);
+    }
+
+    private void ensure0thBufferAllocated() throws HyracksDataException {
+        if ((currentBufferIndex == 0 && !isFirstBufferFromColumnPool)) {
+            // grab one from pool, as the buffers has already been reserved.
+            currentBuf = confiscateNewBuffer();
+            boolean isBufferEmpty = buffers.isEmpty();
+            currentBuf.clear();
+            if (buffers.isEmpty()) {
+                buffers.add(currentBuf);
+            } else {
+                buffers.set(0, currentBuf);
+            }
+            if (isBufferEmpty) {
+                allocatedBytes += currentBuf.capacity();
+            }
+        }
+    }
+
+    @Override
+    protected void releaseColumnBuffer() {
+        if (!isFirstBufferFromColumnPool) {
+            return;
+        }
+        multiPageOpRef.getValue().releaseTemporary0thBuffer(buffers.get(0));
+        isFirstBufferFromColumnPool = false;
+        buffers.set(0, null);
+    }
+
     @Override
     public void writeTo(OutputStream outputStream) throws IOException {
         int writtenSize = 0;
         int numberOfUsedBuffers = currentBufferIndex + 1;
+        // This can be the case where the empty columns are being written
+        ensure0thBufferAllocated();
         for (int i = 0; i < numberOfUsedBuffers; i++) {
             ByteBuffer buffer = buffers.get(i);
             outputStream.write(buffer.array(), 0, buffer.position());
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
index b45d7d91de..21ad11b3c6 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnMetadata.java
@@ -457,11 +457,13 @@ public class FlushColumnMetadata extends 
AbstractColumnMetadata {
     }
 
     public void close() {
-        //Dereference multiPageOp
-        multiPageOpRef.setValue(null);
         for (int i = 0; i < columnWriters.size(); i++) {
             columnWriters.get(i).close();
         }
+        // In close, there is a rest call, where the buffer is being returned 
to @link ColumnBufferPool
+        // hence, we should dereference the multiPageOp after close().
+        //Dereference multiPageOp
+        multiPageOpRef.setValue(null);
     }
 
     protected void flushDefinitionLevels(int parentMask, int childMask, 
RunLengthIntArray parentDefLevels,
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index b33a2b4c00..c0fd0493cb 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -200,6 +200,12 @@ public class FlushColumnTupleWriter extends 
AbstractColumnTupleWriter {
         writer.close();
     }
 
+    @Override
+    public final void abort() {
+        // this call will reset the writers, releasing the 0th buffer back to 
the pool
+        columnMetadata.close();
+    }
+
     public void updateColumnMetadataForCurrentTuple(ITupleReference tuple) 
throws HyracksDataException {
         // Execution can reach here in case of Load statements
         // and the type of tuple in that case is PermutingFrameTupleReference
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index 9feb789180..2c7ae4d9bf 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -243,6 +243,12 @@ public class MergeColumnTupleWriter extends 
AbstractColumnTupleWriter {
         writer.close();
     }
 
+    @Override
+    public final void abort() {
+        // this call will reset the writers, releasing the 0th buffer back to 
the pool
+        columnMetadata.close();
+    }
+
     @Override
     public void reset() {
     }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
index b0d1a01015..9860822ec2 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -85,10 +85,10 @@ public final class MergeColumnWriteMetadata extends 
AbstractColumnImmutableMetad
     }
 
     public void close() {
-        multiPageOpRef.setValue(null);
         for (int i = 0; i < columnWriters.size(); i++) {
             columnWriters.get(i).close();
         }
+        multiPageOpRef.setValue(null);
     }
 
     public static MergeColumnWriteMetadata create(ARecordType datasetType, 
ARecordType metaType,
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
index aa3cb71aaf..3a8304ec16 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
@@ -44,6 +44,16 @@ public class NoOpWriteMultiPageOp implements 
IColumnWriteMultiPageOp {
         return null;
     }
 
+    @Override
+    public ByteBuffer confiscateTemporary0thBuffer() {
+        return null;
+    }
+
+    @Override
+    public void releaseTemporary0thBuffer(ByteBuffer buffer) {
+
+    }
+
     @Override
     public void persist() throws HyracksDataException {
 
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
index 556a87c82f..fb19a7e624 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/TestWriteMultiPageOp.java
@@ -47,6 +47,16 @@ public class TestWriteMultiPageOp implements 
IColumnWriteMultiPageOp {
         return dummyBufferCache.allocateTemporary();
     }
 
+    @Override
+    public ByteBuffer confiscateTemporary0thBuffer() {
+        return ByteBuffer.allocate(32 * 1024); // 32KB
+    }
+
+    @Override
+    public void releaseTemporary0thBuffer(ByteBuffer buffer) {
+
+    }
+
     @Override
     public void persist() {
         //NoOp
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 888cea1186..6d4a428876 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 import org.apache.hyracks.util.cache.ICacheManager;
@@ -69,6 +70,8 @@ public interface INcApplicationContext extends 
IApplicationContext {
 
     IBufferCache getBufferCache();
 
+    IColumnBufferPool getColumnBufferPool();
+
     IVirtualBufferCache getVirtualBufferCache();
 
     ILocalResourceRepository getLocalResourceRepository();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index c9090c63b3..435d8c2596 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -22,10 +22,13 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
+import static org.apache.hyracks.control.common.config.OptionTypes.LONG;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER;
 import static 
org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER;
 import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedDoubleType;
+import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedIntegerType;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
 
@@ -71,7 +74,11 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_FORMAT(STRING, "row"),
         STORAGE_PARTITIONING(STRING, "dynamic"),
         STORAGE_PARTITIONS_COUNT(INTEGER, 8),
-        STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE));
+        STORAGE_MAX_COMPONENT_SIZE(LONG_BYTE_UNIT, 
StorageUtil.getLongSizeInBytes(1, StorageUtil.StorageUnit.TERABYTE)),
+        STORAGE_COLUMN_BUFFER_SIZE(INTEGER_BYTE_UNIT, 
StorageUtil.getIntSizeInBytes(4, KILOBYTE)),
+        STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE(getRangedDoubleType(0.0, 
100.0), 3.0d),
+        STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE(getRangedIntegerType(1, 50000), 
8000),
+        STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT(LONG, 
TimeUnit.SECONDS.toMillis(120));
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -95,6 +102,10 @@ public class StorageProperties extends AbstractProperties {
                 case STORAGE_COLUMN_MAX_TUPLE_COUNT:
                 case STORAGE_COLUMN_FREE_SPACE_TOLERANCE:
                 case STORAGE_COLUMN_MAX_LEAF_NODE_SIZE:
+                case STORAGE_COLUMN_BUFFER_SIZE:
+                case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE:
+                case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE:
+                case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -163,6 +174,14 @@ public class StorageProperties extends AbstractProperties {
                             + " changed after any dataset has been created";
                 case STORAGE_MAX_COMPONENT_SIZE:
                     return "The resultant disk component after a merge must 
not exceed the specified maximum size.";
+                case STORAGE_COLUMN_BUFFER_SIZE:
+                    return "The size in bytes of each buffer in the column 
buffer pool.";
+                case STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE:
+                    return "The percentage of the JVM memory allocated to the 
column buffer pool. This controls the number of the column buffer relative to 
the total JVM memory.";
+                case STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE:
+                    return "The maximum number of buffers in the column buffer 
pool.";
+                case STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT:
+                    return "The maximum time in milliseconds to wait for 
acquiring a buffer from the column buffer pool.";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -233,6 +252,22 @@ public class StorageProperties extends AbstractProperties {
         return 
accessor.getDouble(Option.STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE);
     }
 
+    public int getColumnBufferSize() {
+        return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_SIZE);
+    }
+
+    public double getColumnBufferPoolMemoryPercentage() {
+        return 
accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE);
+    }
+
+    public int getColumnBufferPoolMaxSize() {
+        return accessor.getInt(Option.STORAGE_COLUMN_BUFFER_POOL_MAX_SIZE);
+    }
+
+    public long getColumnBufferAcquireTimeout() {
+        return accessor.getLong(Option.STORAGE_COLUMN_BUFFER_ACQUIRE_TIMEOUT);
+    }
+
     public int getBufferCacheNumPages() {
         return (int) (getBufferCacheSize() / (getBufferCachePageSize() + 
IBufferCache.RESERVED_HEADER_BYTES));
     }
@@ -248,6 +283,11 @@ public class StorageProperties extends AbstractProperties {
             int maxConcurrentMerges = getMaxConcurrentMerges(numPartitions);
             int maxConcurrentFlushes = getMaxConcurrentFlushes(numPartitions);
             int writeBufferSize = 
runtimeContext.getCloudProperties().getWriteBufferSize();
+
+            double columnBufferPoolSizePercentage =
+                    
accessor.getDouble(Option.STORAGE_COLUMN_BUFFER_POOL_MEMORY_PERCENTAGE);
+            long maxMemoryForColumnBuffers = (long) (MAX_HEAP_BYTES * 
(columnBufferPoolSizePercentage / 100));
+            jobExecutionMemory -= maxMemoryForColumnBuffers;
             jobExecutionMemory -= (long) (maxConcurrentFlushes + 
maxConcurrentMerges) * writeBufferSize;
 
         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
index 620417be19..6c60bb522f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RuntimeComponentsProvider.java
@@ -30,6 +30,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProv
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
@@ -59,6 +60,11 @@ public class RuntimeComponentsProvider implements 
IStorageManager, ILSMIOOperati
         return getAppCtx(ctx).getBufferCache();
     }
 
+    @Override
+    public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) {
+        return getAppCtx(ctx).getColumnBufferPool();
+    }
+
     @Override
     public ILocalResourceRepository 
getLocalResourceRepository(INCServiceContext ctx) {
         return getAppCtx(ctx).getLocalResourceRepository();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
index 5970abb4c4..ff0288bb99 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/config/OptionTypes.java
@@ -73,27 +73,7 @@ public class OptionTypes {
 
     public static final IOptionType<Integer> INTEGER = new IntegerOptionType();
 
-    public static final IOptionType<Double> DOUBLE = new IOptionType<Double>() 
{
-        @Override
-        public Double parse(String s) {
-            return Double.parseDouble(s);
-        }
-
-        @Override
-        public Double parse(JsonNode node) {
-            return node.isNull() ? null : node.asDouble();
-        }
-
-        @Override
-        public Class<Double> targetType() {
-            return Double.class;
-        }
-
-        @Override
-        public void serializeJSONField(String fieldName, Object value, 
ObjectNode node) {
-            node.put(fieldName, (double) value);
-        }
-    };
+    public static final IOptionType<Double> DOUBLE = new DoubleOptionType();
 
     public static final IOptionType<String> STRING = new IOptionType<String>() 
{
         @Override
@@ -260,6 +240,11 @@ public class OptionTypes {
         return new RangedIntegerOptionType(minValueInclusive, 
maxValueInclusive);
     }
 
+    public static IOptionType<Double> getRangedDoubleType(final double 
minValueInclusive,
+            final double maxValueInclusive) {
+        return new RangedDoubleOptionType(minValueInclusive, 
maxValueInclusive);
+    }
+
     public static class IntegerOptionType implements IOptionType<Integer> {
         @Override
         public Integer parse(String s) {
@@ -432,6 +417,73 @@ public class OptionTypes {
         }
     }
 
+    private static class DoubleOptionType implements IOptionType<Double> {
+        @Override
+        public Double parse(String s) {
+            return Double.parseDouble(s);
+        }
+
+        @Override
+        public Double parse(JsonNode node) {
+            return node.isNull() ? null : node.asDouble();
+        }
+
+        @Override
+        public Class<Double> targetType() {
+            return Double.class;
+        }
+
+        @Override
+        public void serializeJSONField(String fieldName, Object value, 
ObjectNode node) {
+            node.put(fieldName, (double) value);
+        }
+    }
+
+    private static class RangedDoubleOptionType extends DoubleOptionType {
+        private final double minValue;
+        private final double maxValue;
+
+        RangedDoubleOptionType(double minValue, double maxValue) {
+            this.minValue = minValue;
+            this.maxValue = maxValue;
+        }
+
+        @Override
+        public Double parse(String value) {
+            double doubleValue = super.parse(value);
+            rangeCheck(doubleValue);
+            return doubleValue;
+        }
+
+        @Override
+        public Double parse(JsonNode node) {
+            if (node.isNull()) {
+                return null;
+            }
+            double doubleValue = node.asDouble();
+            rangeCheck(doubleValue);
+            return doubleValue;
+        }
+
+        void rangeCheck(double doubleValue) {
+            if (Double.isNaN(doubleValue)) {
+                throw new IllegalArgumentException("double value cannot be 
NaN");
+            }
+            if (doubleValue < minValue || doubleValue > maxValue) {
+                if (maxValue == Double.MAX_VALUE) {
+                    if (minValue == 0.0) {
+                        throw new IllegalArgumentException("double value must 
not be negative, but was " + doubleValue);
+                    } else if (minValue == Double.MIN_VALUE) {
+                        throw new IllegalArgumentException(
+                                "double value must be greater than " + 
Double.MIN_VALUE + ", but was " + doubleValue);
+                    }
+                }
+                throw new IllegalArgumentException("double value must be 
between " + minValue + "-" + maxValue
+                        + " (inclusive), but was " + doubleValue);
+            }
+        }
+    }
+
     public static final IOptionType<Long> getRangedLongByteUnit(long min, long 
max) {
         return new LongByteUnit(min, max);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
index 426e81db8f..487652e31a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/BTreeHelperStorageManager.java
@@ -26,6 +26,7 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
@@ -48,6 +49,11 @@ public class BTreeHelperStorageManager implements 
IStorageManager {
         return RuntimeContext.get(ctx).getBufferCache();
     }
 
+    @Override
+    public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) {
+        return RuntimeContext.get(ctx).getColumnBufferPool();
+    }
+
     @Override
     public ILocalResourceRepository 
getLocalResourceRepository(INCServiceContext ctx) {
         return RuntimeContext.get(ctx).getLocalResourceRepository();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
index f5edc7469c..08a1dec947 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/RuntimeContext.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.examples.btree.helper;
 
 import java.util.HashMap;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,11 +32,13 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import 
org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool;
 import 
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
 import org.apache.hyracks.storage.common.file.FileMapManager;
@@ -51,6 +54,7 @@ import org.apache.hyracks.util.annotations.TestOnly;
 public class RuntimeContext {
     private final IIOManager ioManager;
     private final IBufferCache bufferCache;
+    private final IColumnBufferPool columnBufferPool;
     private final IFileMapManager fileMapManager;
     private final ILocalResourceRepository localResourceRepository;
     private final IResourceLifecycleManager<IIndex> lcManager;
@@ -65,6 +69,7 @@ public class RuntimeContext {
         this.ioManager = appCtx.getIoManager();
         bufferCache = new BufferCache(ioManager, prs, new 
DelayPageCleanerPolicy(1000), fileMapManager, 100, 10,
                 threadFactory, new HashMap<>(), 
DefaultBufferCacheReadContextProvider.DEFAULT);
+        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, 
TimeUnit.MINUTES.toMillis(2));
         ILocalResourceRepositoryFactory localResourceRepositoryFactory = new 
TransientLocalResourceRepositoryFactory();
         localResourceRepository = 
localResourceRepositoryFactory.createRepository();
         resourceIdFactory = (new 
ResourceIdFactoryProvider(localResourceRepository)).createResourceIdFactory();
@@ -83,6 +88,10 @@ public class RuntimeContext {
         return bufferCache;
     }
 
+    public IColumnBufferPool getColumnBufferPool() {
+        return columnBufferPool;
+    }
+
     public IFileMapProvider getFileMapManager() {
         return fileMapManager;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
index 60f7967cbd..fa1d9c4167 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/AbstractColumnTupleWriter.java
@@ -96,6 +96,11 @@ public abstract class AbstractColumnTupleWriter extends 
AbstractTupleWriterDisab
      */
     public abstract void close();
 
+    /**
+     * On abort, release all allocated temporary buffers
+     */
+    public abstract void abort();
+
     /**
      * reset the state after flush
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
index b46b67d097..a54a428201 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnWriteMultiPageOp.java
@@ -54,4 +54,15 @@ public interface IColumnWriteMultiPageOp {
      * @return a {@link IBufferCache}-backed buffer for temporary use
      */
     ByteBuffer confiscateTemporary() throws HyracksDataException;
+
+    /**
+     * @return a {@link 
org.apache.hyracks.storage.common.buffercache.ColumnBufferPool} - backed first 
buffer for temporary use
+     */
+    ByteBuffer confiscateTemporary0thBuffer();
+
+    /**
+     * release the acquired temporary buffer back to the {@link 
org.apache.hyracks.storage.common.buffercache.ColumnBufferPool}
+     * @param buffer
+     */
+    void releaseTemporary0thBuffer(ByteBuffer buffer);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index 65e19bbe20..ed8c4f16b7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -93,8 +93,9 @@ public class LSMColumnBTreeLocalResource extends 
LSMBTreeLocalResource {
         pageWriteCallbackFactory.initialize(serviceCtx, this);
         IDiskCacheMonitoringService diskCacheService = 
storageManager.getDiskCacheMonitoringService(serviceCtx);
         return LSMColumnBTreeUtil.createLSMTree(config, ioManager, vbcs, file,
-                storageManager.getBufferCache(serviceCtx), typeTraits, 
cmpFactories, bloomFilterKeyFields,
-                bloomFilterFalsePositiveRate, 
mergePolicyFactory.createMergePolicy(mergePolicyProperties, serviceCtx),
+                storageManager.getBufferCache(serviceCtx), 
storageManager.getColumnBufferPool(serviceCtx), typeTraits,
+                cmpFactories, bloomFilterKeyFields, 
bloomFilterFalsePositiveRate,
+                mergePolicyFactory.createMergePolicy(mergePolicyProperties, 
serviceCtx),
                 opTrackerProvider.getOperationTracker(serviceCtx, this), 
ioSchedulerProvider.getIoScheduler(serviceCtx),
                 ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields, 
metadataPageManagerFactory, false,
                 serviceCtx.getTracer(), compressorDecompressorFactory, 
nullTypeTraits, nullIntrospector,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
index a863d13b9f..c791d00d28 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTree.java
@@ -36,14 +36,18 @@ import org.apache.hyracks.storage.common.IIndexBulkLoader;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.NoOpIndexCursorStats;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
 
 public class ColumnBTree extends DiskBTree {
-    public ColumnBTree(IBufferCache bufferCache, IPageManager freePageManager,
+    private final IColumnBufferPool columnBufferPool;
+
+    public ColumnBTree(IBufferCache bufferCache, IColumnBufferPool 
columnBufferPool, IPageManager freePageManager,
             ITreeIndexFrameFactory interiorFrameFactory, 
ITreeIndexFrameFactory leafFrameFactory,
             IBinaryComparatorFactory[] cmpFactories, int fieldCount, 
FileReference file) {
         super(bufferCache, freePageManager, interiorFrameFactory, 
leafFrameFactory, cmpFactories, fieldCount, file);
+        this.columnBufferPool = columnBufferPool;
     }
 
     @Override
@@ -73,6 +77,10 @@ public class ColumnBTree extends DiskBTree {
         return new ColumnBTreeAccessor(this, iap, index, projectionInfo, 
context);
     }
 
+    public IColumnBufferPool getColumnBufferPool() {
+        return columnBufferPool;
+    }
+
     public class ColumnBTreeAccessor extends DiskBTreeAccessor {
         private final int index;
         private final IColumnProjectionInfo projectionInfo;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
index 244cf47601..0769824425 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeBulkloader.java
@@ -38,6 +38,7 @@ import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnW
 import org.apache.hyracks.storage.common.buffercache.CachedPage;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
 import org.apache.hyracks.storage.common.file.BufferedFileHandle;
@@ -58,8 +59,12 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
     private final IColumnWriteContext columnWriteContext;
     private final int maxColumnsInPageZerothSegment;
     private final IColumnPageZeroWriter.ColumnPageZeroWriterType 
pageZeroWriterType;
+    private final IColumnBufferPool columnBufferPool;
+    private boolean writerInitialized = false;
     private boolean setLowKey;
     private int tupleCount;
+    // reservedBufferCount tracks the number of column buffers currently held 
by this bulkloader.
+    private int reservedBufferCount;
 
     // For logging
     private int numberOfLeafNodes;
@@ -76,13 +81,14 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
             IPageWriteCallback callback, ITreeIndex index, ITreeIndexFrame 
leafFrame,
             IBufferCacheWriteContext writeContext) throws HyracksDataException 
{
         super(fillFactor, verifyInput, callback, index, leafFrame, 
writeContext);
+        columnBufferPool = ((ColumnBTree) index).getColumnBufferPool();
         columnsPages = new ArrayList<>();
         pageZeroSegments = new ArrayList<>();
         tempConfiscatedPages = new ArrayList<>();
         columnWriteContext = (IColumnWriteContext) writeContext;
         columnarFrame = (ColumnBTreeWriteLeafFrame) leafFrame;
         columnWriter = columnarFrame.getColumnTupleWriter();
-        columnWriter.init(this);
+        reservedBufferCount = 0;
         lowKey = new BTreeSplitKey(tupleWriter.createTupleReference());
         lowKey.getTuple().setFieldCount(cmp.getKeyFieldCount());
         setLowKey = true;
@@ -106,7 +112,10 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
 
     @Override
     public void add(ITupleReference tuple) throws HyracksDataException {
-        if (isFull(tuple)) {
+        // track the number of columns in the current tuple
+        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
+        ensureWritersInitialized();
+        if (isLeafPageFull(tuple)) {
             writeFullLeafPage();
             confiscateNewLeafPage();
         }
@@ -120,33 +129,124 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         tupleCount++;
     }
 
+    /**
+     * Ensures that the column writers are initialized and the correct number 
of column buffers are reserved.
+     * This method is called before any tuples are added.
+     */
+    private void ensureWritersInitialized() throws HyracksDataException {
+        if (!writerInitialized) {
+            writerInitialized = true;
+            int numberOfColumns = 
columnWriter.getAbsoluteNumberOfColumns(true);
+            reserveBuffers(numberOfColumns);
+            reservedBufferCount = numberOfColumns;
+            columnWriter.init(this);
+        }
+    }
+
+    /**
+     * Reserves the specified number of column buffers from the 
columnBufferPool.
+     * If reservation fails, releases any already held buffers and throws an 
exception.
+     */
+    private void reserveBuffers(int numBuffers) throws HyracksDataException {
+        try {
+            columnBufferPool.reserve(numBuffers);
+        } catch (Exception e) {
+            // Release any already held buffers before throwing
+            releaseBuffers();
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void releaseBuffers() {
+        if (reservedBufferCount > 0) {
+            columnBufferPool.release(reservedBufferCount);
+            reservedBufferCount = 0;
+        }
+    }
+
     @Override
     protected ITreeIndexTupleReference createTupleReference() {
         return tupleWriter.createTupleReference();
     }
 
-    private boolean isFull(ITupleReference tuple) throws HyracksDataException {
+    /**
+     * Determines if the current leaf page is full and needs to be flushed.
+     * @param tuple The tuple to be added.
+     * @return true if the page is full, false otherwise.
+     */
+    private boolean isLeafPageFull(ITupleReference tuple) throws 
HyracksDataException {
         if (tupleCount == 0) {
-            columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-            // this is for non-adaptive case.
-            columnWriter.setWriterType(pageZeroWriterType);
-            return false;
-        } else if (tupleCount >= columnWriter.getMaxNumberOfTuples()) {
-            //We reached the maximum number of tuples
+            return handleFirstTuple();
+        }
+        if (tupleCount >= columnWriter.getMaxNumberOfTuples()) {
+            // Maximum tuple count reached for this leaf page.
             return true;
         }
-        //Columns' Offsets
-        columnWriter.updateColumnMetadataForCurrentTuple(tuple);
-        int requiredFreeSpace = 
columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment,
+        if (canCurrentTupleFit(tuple)) {
+            int requiredColumns = 
columnWriter.getAbsoluteNumberOfColumns(true);
+            return !ensureSufficientBuffers(requiredColumns);
+        }
+        // Not enough space for the tuple.
+        return true;
+    }
+
+    /**
+     * Ensures that we have enough column buffers for the required number of 
columns.
+     * Attempts to reserve more buffers if needed.
+     * @param requiredColumns The number of columns needed for the current 
tuple.
+     * @return true if sufficient buffers are available or successfully 
reserved, false otherwise.
+     */
+    private boolean ensureSufficientBuffers(int requiredColumns) throws 
HyracksDataException {
+        if (reservedBufferCount < requiredColumns) {
+            int additionalBuffersNeeded = requiredColumns - 
reservedBufferCount;
+            if (additionalBuffersNeeded > 0) {
+                // Try to reserve additional buffers
+                boolean reserved = 
columnBufferPool.tryReserve(additionalBuffersNeeded);
+                if (reserved) {
+                    reservedBufferCount += additionalBuffersNeeded;
+                    return true;
+                }
+                // Not enough buffers available, do not update 
reservedBufferCount
+                return false;
+            }
+            // If additionalBuffersNeeded == 0, we already have enough
+            return true;
+        }
+        return true;
+    }
+
+    /**
+     * Checks if the current tuple can fit in the current leaf page based on 
space requirements.
+     * @param tuple The tuple to be added.
+     * @return true if the tuple can fit, false otherwise.
+     */
+    private boolean canCurrentTupleFit(ITupleReference tuple) {
+        int requiredSpace = 
columnWriter.getPageZeroWriterOccupiedSpace(maxColumnsInPageZerothSegment,
                 columnarFrame.getBuffer().capacity(), true, 
pageZeroWriterType);
-        //Occupied space from previous writes
-        requiredFreeSpace += columnWriter.getPrimaryKeysEstimatedSize();
-        //min and max tuples' sizes
-        requiredFreeSpace += lowKey.getTuple().getTupleSize() + 
getSplitKeySize(tuple);
-        //New tuple required space
-        requiredFreeSpace += columnWriter.bytesRequired(tuple);
-        lastRequiredFreeSpace = requiredFreeSpace;
-        return bufferCache.getPageSize() <= requiredFreeSpace;
+        requiredSpace += columnWriter.getPrimaryKeysEstimatedSize();
+        requiredSpace += lowKey.getTuple().getTupleSize() + 
getSplitKeySize(tuple);
+        requiredSpace += columnWriter.bytesRequired(tuple);
+        lastRequiredFreeSpace = requiredSpace;
+
+        // If the page has enough space, check if we have enough buffers.
+        // If not, we need to flush to avoid deadlock on column buffers.
+        return bufferCache.getPageSize() > requiredSpace;
+    }
+
+    /**
+     * Handles setup for the very first tuple in the leaf page.
+     * Reserves the required number of column buffers and sets the writer type.
+     * @return false, as the first tuple always fits after setup.
+     */
+    private boolean handleFirstTuple() throws HyracksDataException {
+        int requiredColumns = columnWriter.getAbsoluteNumberOfColumns(true);
+        int additionalBuffersNeeded = requiredColumns - reservedBufferCount;
+        if (additionalBuffersNeeded > 0) {
+            reserveBuffers(additionalBuffersNeeded);
+            reservedBufferCount += additionalBuffersNeeded;
+        }
+        columnWriter.setWriterType(pageZeroWriterType);
+        return false;
     }
 
     private void setMinMaxKeys(ITupleReference tuple) {
@@ -160,80 +260,91 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
 
     @Override
     public void end() throws HyracksDataException {
-        if (tupleCount > 0) {
-            
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
-            try {
-                columnarFrame.flush(columnWriter, tupleCount, 
maxColumnsInPageZerothSegment, lowKey.getTuple(),
-                        splitKey.getTuple(), this);
-                updatePageLayoutStat();
-            } catch (Exception e) {
-                logState(e);
-                throw e;
+        try {
+            ensureWritersInitialized();
+            if (tupleCount > 0) {
+                
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+                try {
+                    columnarFrame.flush(columnWriter, tupleCount, 
maxColumnsInPageZerothSegment, lowKey.getTuple(),
+                            splitKey.getTuple(), this);
+                    updatePageLayoutStat();
+                } catch (Exception e) {
+                    logState(e);
+                    throw e;
+                }
+            }
+            columnWriter.close();
+            //We are done, return any temporary confiscated pages
+            for (ICachedPage page : tempConfiscatedPages) {
+                bufferCache.returnPage(page, false);
             }
-        }
-        columnWriter.close();
-        //We are done, return any temporary confiscated pages
-        for (ICachedPage page : tempConfiscatedPages) {
-            bufferCache.returnPage(page, false);
-        }
 
-        // For logging
-        int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
-        tempConfiscatedPages.clear();
-        //Where Page0 and columns pages will be written
-        super.end();
+            // For logging
+            int numberOfTempConfiscatedPages = tempConfiscatedPages.size();
+            tempConfiscatedPages.clear();
+            //Where Page0 and columns pages will be written
+            super.end();
 
-        log("Finished", numberOfTempConfiscatedPages);
+            log("Finished", numberOfTempConfiscatedPages);
+        } finally {
+            // Release all column buffers held by this bulkloader
+            releaseBuffers();
+        }
     }
 
     @Override
     protected void writeFullLeafPage() throws HyracksDataException {
-        NodeFrontier leafFrontier = nodeFrontiers.get(0);
-        splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 
0);
-        splitKey.setLeftPage(leafFrontier.pageId);
-        if (tupleCount > 0) {
-            //We need to flush columns to confiscate all columns pages first 
before calling propagateBulk
-            try {
-                columnarFrame.flush(columnWriter, tupleCount, 
maxColumnsInPageZerothSegment, lowKey.getTuple(),
-                        splitKey.getTuple(), this);
-                updatePageLayoutStat();
-            } catch (Exception e) {
-                logState(e);
-                throw e;
+        try {
+            NodeFrontier leafFrontier = nodeFrontiers.get(0);
+            
splitKey.getTuple().resetByTupleOffset(splitKey.getBuffer().array(), 0);
+            splitKey.setLeftPage(leafFrontier.pageId);
+            if (tupleCount > 0) {
+                //We need to flush columns to confiscate all columns pages 
first before calling propagateBulk
+                try {
+                    columnarFrame.flush(columnWriter, tupleCount, 
maxColumnsInPageZerothSegment, lowKey.getTuple(),
+                            splitKey.getTuple(), this);
+                    updatePageLayoutStat();
+                } catch (Exception e) {
+                    logState(e);
+                    throw e;
+                }
             }
-        }
-
-        propagateBulk(1, pagesToWrite);
 
-        //Take a page for the next leaf
-        leafFrontier.pageId = freePageManager.takePage(metaFrame);
-        columnarFrame.setNextLeaf(leafFrontier.pageId);
+            propagateBulk(1, pagesToWrite);
 
-        /*
-         * Write columns' pages first to ensure they (columns' pages) are 
written before pageZero.
-         * It ensures pageZero does not land in between columns' pages if 
compression is enabled
-         */
-        writeColumnAndSegmentPages();
-        //Then write page0
-        write(leafFrontier.page);
+            //Take a page for the next leaf
+            leafFrontier.pageId = freePageManager.takePage(metaFrame);
+            columnarFrame.setNextLeaf(leafFrontier.pageId);
 
-        //Write interior nodes after writing columns pages
-        for (ICachedPage c : pagesToWrite) {
-            write(c);
-        }
+            /*
+             * Write columns' pages first to ensure they (columns' pages) are 
written before pageZero.
+             * It ensures pageZero does not land in between columns' pages if 
compression is enabled
+             */
+            writeColumnAndSegmentPages();
+            //Then write page0
+            write(leafFrontier.page);
 
-        // For logging
-        maxNumberOfPagesInALeafNode = Math.max(maxNumberOfPagesInALeafNode, 
numberOfPagesInCurrentLeafNode);
-        maxTupleCount = Math.max(maxTupleCount, tupleCount);
-        // Starts with 1 for page0
-        numberOfPagesInCurrentLeafNode = 1;
-        numberOfLeafNodes++;
+            //Write interior nodes after writing columns pages
+            for (ICachedPage c : pagesToWrite) {
+                write(c);
+            }
 
-        // Clear for next page
-        pagesToWrite.clear();
-        splitKey.setRightPage(leafFrontier.pageId);
-        setLowKey = true;
-        tupleCount = 0;
+            // For logging
+            maxNumberOfPagesInALeafNode = 
Math.max(maxNumberOfPagesInALeafNode, numberOfPagesInCurrentLeafNode);
+            maxTupleCount = Math.max(maxTupleCount, tupleCount);
+            // Starts with 1 for page0
+            numberOfPagesInCurrentLeafNode = 1;
+            numberOfLeafNodes++;
+
+            // Clear for next page
+            pagesToWrite.clear();
+            splitKey.setRightPage(leafFrontier.pageId);
+            setLowKey = true;
+            tupleCount = 0;
+        } finally {
+            // Release all column buffers held by this bulkloader for this leaf
+            releaseBuffers();
+        }
     }
 
     @Override
@@ -295,21 +406,29 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
 
     @Override
     public void abort() throws HyracksDataException {
-        for (ICachedPage page : columnsPages) {
-            bufferCache.returnPage(page, false);
-        }
+        try {
+            for (ICachedPage page : columnsPages) {
+                bufferCache.returnPage(page, false);
+            }
 
-        for (ICachedPage page : pageZeroSegments) {
-            bufferCache.returnPage(page, false);
-        }
+            for (ICachedPage page : pageZeroSegments) {
+                bufferCache.returnPage(page, false);
+            }
 
-        for (ICachedPage page : tempConfiscatedPages) {
-            bufferCache.returnPage(page, false);
-        }
-        super.abort();
+            for (ICachedPage page : tempConfiscatedPages) {
+                bufferCache.returnPage(page, false);
+            }
+            super.abort();
+            // Release the acquired column buffer back to the pool
+            columnWriter.abort();
 
-        // For logging
-        log("Aborted", tempConfiscatedPages.size());
+            // For logging
+            log("Aborted", tempConfiscatedPages.size());
+        } finally {
+
+            // Release all column buffers held by this bulkloader
+            releaseBuffers();
+        }
     }
 
     private void setSplitKey(ISplitKey splitKey, ITupleReference tuple) {
@@ -375,6 +494,16 @@ public final class ColumnBTreeBulkloader extends 
BTreeNSMBulkLoader implements I
         return page.getBuffer();
     }
 
+    @Override
+    public ByteBuffer confiscateTemporary0thBuffer() {
+        return columnBufferPool.getBuffer();
+    }
+
+    @Override
+    public void releaseTemporary0thBuffer(ByteBuffer buffer) {
+        columnBufferPool.recycle(buffer);
+    }
+
     private void logState(Exception e) {
         try {
             ObjectNode state = JSONUtil.createObject();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java
index 1b9e198260..09a79a8d20 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeFactory.java
@@ -26,18 +26,22 @@ import 
org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 
 public class ColumnBTreeFactory extends TreeIndexFactory<ColumnBTree> {
-    public ColumnBTreeFactory(IIOManager ioManager, IBufferCache bufferCache,
+    private final IColumnBufferPool columnBufferPool;
+
+    public ColumnBTreeFactory(IIOManager ioManager, IBufferCache bufferCache, 
IColumnBufferPool columnBufferPool,
             IPageManagerFactory freePageManagerFactory, ITreeIndexFrameFactory 
interiorFrameFactory,
             ITreeIndexFrameFactory leafFrameFactory, 
IBinaryComparatorFactory[] cmpFactories, int fieldCount) {
         super(ioManager, bufferCache, freePageManagerFactory, 
interiorFrameFactory, leafFrameFactory, cmpFactories,
                 fieldCount);
+        this.columnBufferPool = columnBufferPool;
     }
 
     @Override
     public ColumnBTree createIndexInstance(FileReference file) throws 
HyracksDataException {
-        return new ColumnBTree(bufferCache, 
freePageManagerFactory.createPageManager(bufferCache), interiorFrameFactory,
-                leafFrameFactory, cmpFactories, fieldCount, file);
+        return new ColumnBTree(bufferCache, columnBufferPool, 
freePageManagerFactory.createPageManager(bufferCache),
+                interiorFrameFactory, leafFrameFactory, cmpFactories, 
fieldCount, file);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index 6050749443..022cb38b6a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -49,6 +49,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor.ICurs
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.util.trace.ITracer;
 
 public class LSMColumnBTree extends LSMBTree {
@@ -56,6 +57,7 @@ public class LSMColumnBTree extends LSMBTree {
     private final IColumnManager columnManager;
     private final IColumnIndexDiskCacheManager diskCacheManager;
     private final ILSMDiskComponentFactory mergeComponentFactory;
+    private final IColumnBufferPool columnBufferPool;
     /**
      * This column metadata only used during flush and dataset bulkload 
operations. We cannot have more than one
      * thread to do a flush/dataset bulkload. Do not use it for search/scan. 
Instead, use the latest component
@@ -68,13 +70,14 @@ public class LSMColumnBTree extends LSMBTree {
     public LSMColumnBTree(NCConfig storageConfig, IIOManager ioManager, 
List<IVirtualBufferCache> virtualBufferCaches,
             ITreeIndexFrameFactory interiorFrameFactory, 
ITreeIndexFrameFactory insertLeafFrameFactory,
             ITreeIndexFrameFactory deleteLeafFrameFactory, IBufferCache 
diskBufferCache,
-            ILSMIndexFileManager fileManager, ILSMDiskComponentFactory 
componentFactory,
-            ILSMDiskComponentFactory mergeComponentFactory, 
ILSMDiskComponentFactory bulkloadComponentFactory,
-            double bloomFilterFalsePositiveRate, int fieldCount, 
IBinaryComparatorFactory[] cmpFactories,
-            ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, 
ILSMIOOperationScheduler ioScheduler,
-            ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
-            int[] btreeFields, ITracer tracer, IColumnManager columnManager, 
boolean atomic,
-            IColumnIndexDiskCacheManager diskCacheManager) throws 
HyracksDataException {
+            IColumnBufferPool columnBufferPool, ILSMIndexFileManager 
fileManager,
+            ILSMDiskComponentFactory componentFactory, 
ILSMDiskComponentFactory mergeComponentFactory,
+            ILSMDiskComponentFactory bulkloadComponentFactory, double 
bloomFilterFalsePositiveRate, int fieldCount,
+            IBinaryComparatorFactory[] cmpFactories, ILSMMergePolicy 
mergePolicy, ILSMOperationTracker opTracker,
+            ILSMIOOperationScheduler ioScheduler, 
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
+            ILSMPageWriteCallbackFactory pageWriteCallbackFactory, int[] 
btreeFields, ITracer tracer,
+            IColumnManager columnManager, boolean atomic, 
IColumnIndexDiskCacheManager diskCacheManager)
+            throws HyracksDataException {
         super(storageConfig, ioManager, virtualBufferCaches, 
interiorFrameFactory, insertLeafFrameFactory,
                 deleteLeafFrameFactory, diskBufferCache, fileManager, 
componentFactory, bulkloadComponentFactory, null,
                 null, null, bloomFilterFalsePositiveRate, fieldCount, 
cmpFactories, mergePolicy, opTracker, ioScheduler,
@@ -83,6 +86,7 @@ public class LSMColumnBTree extends LSMBTree {
         this.columnManager = columnManager;
         this.mergeComponentFactory = mergeComponentFactory;
         this.diskCacheManager = diskCacheManager;
+        this.columnBufferPool = columnBufferPool;
     }
 
     @Override
@@ -146,6 +150,10 @@ public class LSMColumnBTree extends LSMBTree {
         return CURSOR_FACTORY;
     }
 
+    public IColumnBufferPool getColumnBufferPool() {
+        return columnBufferPool;
+    }
+
     @Override
     public IColumnIndexDiskCacheManager getDiskCacheManager() {
         return diskCacheManager;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index c8815ae37b..a41561024e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -57,6 +57,7 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.util.trace.ITracer;
 
@@ -64,11 +65,11 @@ public class LSMColumnBTreeUtil {
 
     public static LSMBTree createLSMTree(NCConfig storageConfig, IIOManager 
ioManager,
             List<IVirtualBufferCache> virtualBufferCaches, FileReference file, 
IBufferCache diskBufferCache,
-            ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories, 
int[] bloomFilterKeyFields,
-            double bloomFilterFalsePositiveRate, ILSMMergePolicy mergePolicy, 
ILSMOperationTracker opTracker,
-            ILSMIOOperationScheduler ioScheduler, 
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
-            ILSMPageWriteCallbackFactory pageWriteCallbackFactory, int[] 
btreeFields,
-            IMetadataPageManagerFactory freePageManagerFactory, boolean 
updateAware, ITracer tracer,
+            IColumnBufferPool columnBufferPool, ITypeTraits[] typeTraits, 
IBinaryComparatorFactory[] cmpFactories,
+            int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate, 
ILSMMergePolicy mergePolicy,
+            ILSMOperationTracker opTracker, ILSMIOOperationScheduler 
ioScheduler,
+            ILSMIOOperationCallbackFactory ioOpCallbackFactory, 
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+            int[] btreeFields, IMetadataPageManagerFactory 
freePageManagerFactory, boolean updateAware, ITracer tracer,
             ICompressorDecompressorFactory compressorDecompressorFactory, 
ITypeTraits nullTypeTraits,
             INullIntrospector nullIntrospector, IColumnManagerFactory 
columnManagerFactory, boolean atomic,
             IDiskCacheMonitoringService diskCacheService) throws 
HyracksDataException {
@@ -104,13 +105,15 @@ public class LSMColumnBTreeUtil {
         ITreeIndexFrameFactory interiorFrameFactory = new 
BTreeNSMInteriorFrameFactory(insertTupleWriterFactory);
 
         // BTree factory
-        TreeIndexFactory<ColumnBTree> flushBTreeFactory = new 
ColumnBTreeFactory(ioManager, diskBufferCache,
-                freePageManagerFactory, interiorFrameFactory, 
flushLeafFrameFactory, cmpFactories, typeTraits.length);
-        TreeIndexFactory<ColumnBTree> mergeBTreeFactory = new 
ColumnBTreeFactory(ioManager, diskBufferCache,
-                freePageManagerFactory, interiorFrameFactory, 
mergeLeafFrameFactory, cmpFactories, typeTraits.length);
+        TreeIndexFactory<ColumnBTree> flushBTreeFactory =
+                new ColumnBTreeFactory(ioManager, diskBufferCache, 
columnBufferPool, freePageManagerFactory,
+                        interiorFrameFactory, flushLeafFrameFactory, 
cmpFactories, typeTraits.length);
+        TreeIndexFactory<ColumnBTree> mergeBTreeFactory =
+                new ColumnBTreeFactory(ioManager, diskBufferCache, 
columnBufferPool, freePageManagerFactory,
+                        interiorFrameFactory, mergeLeafFrameFactory, 
cmpFactories, typeTraits.length);
         TreeIndexFactory<ColumnBTree> bulkloadBTreeFactory =
-                new ColumnBTreeFactory(ioManager, diskBufferCache, 
freePageManagerFactory, interiorFrameFactory,
-                        bulkLoadLeafFrameFactory, cmpFactories, 
typeTraits.length);
+                new ColumnBTreeFactory(ioManager, diskBufferCache, 
columnBufferPool, freePageManagerFactory,
+                        interiorFrameFactory, bulkLoadLeafFrameFactory, 
cmpFactories, typeTraits.length);
 
         ILSMIndexFileManager fileNameManager = new 
LSMBTreeFileManager(ioManager, file, flushBTreeFactory, true,
                 compressorDecompressorFactory, diskCacheEnabled);
@@ -124,9 +127,9 @@ public class LSMColumnBTreeUtil {
                 new 
LSMColumnBTreeWithBloomFilterDiskComponentFactory(bulkloadBTreeFactory, 
bloomFilterFactory);
 
         return new LSMColumnBTree(storageConfig, ioManager, 
virtualBufferCaches, interiorFrameFactory,
-                insertLeafFrameFactory, deleteLeafFrameFactory, 
diskBufferCache, fileNameManager, flushComponentFactory,
-                mergeComponentFactory, bulkLoadComponentFactory, 
bloomFilterFalsePositiveRate, typeTraits.length,
-                cmpFactories, mergePolicy, opTracker, ioScheduler, 
ioOpCallbackFactory, pageWriteCallbackFactory,
-                btreeFields, tracer, columnManager, atomic, diskCacheManager);
+                insertLeafFrameFactory, deleteLeafFrameFactory, 
diskBufferCache, columnBufferPool, fileNameManager,
+                flushComponentFactory, mergeComponentFactory, 
bulkLoadComponentFactory, bloomFilterFalsePositiveRate,
+                typeTraits.length, cmpFactories, mergePolicy, opTracker, 
ioScheduler, ioOpCallbackFactory,
+                pageWriteCallbackFactory, btreeFields, tracer, columnManager, 
atomic, diskCacheManager);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
index 47492e8dab..d9557e661b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/IStorageManager.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
@@ -43,6 +44,11 @@ public interface IStorageManager extends Serializable, 
IJsonSerializable {
      */
     IBufferCache getBufferCache(INCServiceContext ctx);
 
+    /**
+     * @return the column buffer pool {@link IColumnBufferPool}
+     */
+    IColumnBufferPool getColumnBufferPool(INCServiceContext ctx);
+
     /**
      * @param ctx the nc service context
      * @return the local resource repository {@link ILocalResourceRepository}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
new file mode 100644
index 0000000000..5a15f12978
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/ColumnBufferPool.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * ColumnBufferPool: a semaphore-based buffer pool for columnar buffer 
management.
+ *
+ * NOTE: getBuffer() and recycle() are called very frequently and are designed 
to be as lightweight as possible.
+ */
+public class ColumnBufferPool implements IColumnBufferPool, 
ILifeCycleComponent {
+    protected static final Logger LOGGER = LogManager.getLogger();
+
+    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final AtomicLong totalAllocatedMemoryInBytes;
+    private final AtomicLong totalPooledMemoryInBytes;
+    private final AtomicLong numAllocNew;
+
+    // Semaphore for buffer-based allocation
+    private final Semaphore bufferSemaphore;
+    private final long maxBufferPoolMemoryLimit;
+    private final double columnBufferPoolMemoryPercentage;
+    private final int columnBufferInBytes;
+    private final int columnBufferPoolMaxSize;
+    private final int maxBuffers;
+
+    // Timeout for buffer reservation in milliseconds
+    private final long reserveTimeoutMillis;
+
+    /**
+     * @param columnBufferInBytes buffer size in bytes
+     * @param columnBufferPoolMaxSize max number of buffers in pool
+     * @param columnBufferPoolMemoryPercentage max percentage of total memory 
for pool
+     * @param reserveTimeoutMillis timeout in milliseconds for buffer 
reservation
+     */
+    public ColumnBufferPool(int columnBufferInBytes, int 
columnBufferPoolMaxSize,
+            double columnBufferPoolMemoryPercentage, long 
reserveTimeoutMillis) {
+        this.totalAllocatedMemoryInBytes = new AtomicLong(0);
+        this.bufferPool = new ArrayBlockingQueue<>(columnBufferPoolMaxSize);
+        this.columnBufferPoolMemoryPercentage = 
columnBufferPoolMemoryPercentage;
+        this.columnBufferPoolMaxSize = columnBufferPoolMaxSize;
+        this.reserveTimeoutMillis = reserveTimeoutMillis;
+        this.maxBufferPoolMemoryLimit = 
getMaxBufferPoolMemoryLimit(columnBufferInBytes,
+                columnBufferPoolMemoryPercentage, columnBufferPoolMaxSize);
+        this.maxBuffers = (int) (maxBufferPoolMemoryLimit / 
columnBufferInBytes);
+        this.bufferSemaphore = new Semaphore(maxBuffers, true);
+        this.columnBufferInBytes = columnBufferInBytes;
+        this.totalPooledMemoryInBytes = new AtomicLong(0);
+        this.numAllocNew = new AtomicLong(0);
+        initializePool();
+        LOGGER.info(
+                "ColumnBufferPool initialized: columnBufferPoolMaxSize={}, 
maxBufferPoolMemoryLimit={}, maxBuffers={}, columnBufferInBytes={}, 
reserveTimeoutMillis={}",
+                columnBufferPoolMaxSize, maxBufferPoolMemoryLimit, maxBuffers, 
columnBufferInBytes,
+                reserveTimeoutMillis);
+    }
+
+    /**
+     * Pre-allocate buffers for fast buffer access.
+     * The number of buffers pre-allocated is the minimum of half the pool 
size and the memory limit.
+     */
+    private void initializePool() {
+        int halfPoolSize = columnBufferPoolMaxSize / 2;
+        int numBuffersToAllocate = Math.min(halfPoolSize, maxBuffers);
+
+        for (int i = 0; i < numBuffersToAllocate; i++) {
+            ByteBuffer buffer = ByteBuffer.allocate(columnBufferInBytes);
+            bufferPool.add(buffer);
+            totalPooledMemoryInBytes.addAndGet(columnBufferInBytes);
+        }
+        LOGGER.info("ColumnBufferPool pre-allocated {} buffers ({} bytes 
each)", numBuffersToAllocate,
+                columnBufferInBytes);
+    }
+
+    /**
+     * Reserve the specified number of buffers, blocking up to the configured 
timeout if necessary.
+     * @param requestedBuffers number of buffers to reserve
+     * @throws InterruptedException if interrupted while waiting
+     * @throws IllegalStateException if unable to reserve within the timeout
+     */
+    @Override
+    public void reserve(int requestedBuffers) throws InterruptedException {
+        if (requestedBuffers <= 0) {
+            return;
+        }
+        boolean acquired = bufferSemaphore.tryAcquire(requestedBuffers, 
reserveTimeoutMillis, TimeUnit.MILLISECONDS);
+        if (!acquired) {
+            LOGGER.error("Failed to reserve {} buffers within {} ms ({} sec)", 
requestedBuffers, reserveTimeoutMillis,
+                    TimeUnit.MILLISECONDS.toSeconds(reserveTimeoutMillis));
+            throw new IllegalStateException("Timeout while reserving column 
buffers (" + requestedBuffers + ") after "
+                    + reserveTimeoutMillis + " ms");
+        }
+    }
+
+    @Override
+    public boolean tryReserve(int requestedBuffers) {
+        if (requestedBuffers <= 0) {
+            return true;
+        }
+        return bufferSemaphore.tryAcquire(requestedBuffers);
+    }
+
+    @Override
+    public void release(int buffers) {
+        if (buffers <= 0) {
+            return;
+        }
+        bufferSemaphore.release(buffers);
+    }
+
+    /**
+     * Fast path for buffer acquisition. Called very frequently.
+     */
+    @Override
+    public ByteBuffer getBuffer() {
+        // Fast path: try to poll from pool
+        ByteBuffer buffer = bufferPool.poll();
+        if (buffer != null) {
+            totalPooledMemoryInBytes.addAndGet(-columnBufferInBytes);
+            buffer.clear();
+            return buffer;
+        }
+
+        // Slow path: allocate new buffer if quota allows
+        ensureAvailableQuota();
+
+        numAllocNew.incrementAndGet();
+        buffer = ByteBuffer.allocate(columnBufferInBytes);
+        totalAllocatedMemoryInBytes.addAndGet(columnBufferInBytes);
+        return buffer;
+    }
+
+    /**
+     * Fast path for buffer recycling. Called very frequently.
+     */
+    @Override
+    public void recycle(ByteBuffer buffer) {
+        if (buffer == null) {
+            throw new IllegalStateException("buffer is null");
+        }
+
+        // Try to return to pool; if full, discard
+        if (bufferPool.offer(buffer)) {
+            totalPooledMemoryInBytes.addAndGet(columnBufferInBytes);
+        } else {
+            totalAllocatedMemoryInBytes.addAndGet(-columnBufferInBytes);
+        }
+    }
+
+    @Override
+    public int getMaxReservedBuffers() {
+        return maxBuffers;
+    }
+
+    /**
+     * Ensures that the total allocated memory does not exceed the 
maxBufferPoolMemoryLimit limit.
+     * Throws if the limit would be exceeded.
+     */
+    private void ensureAvailableQuota() {
+        long spaceAcquiredByPool = (long) columnBufferPoolMaxSize * 
columnBufferInBytes;
+        long totalAllocated = totalAllocatedMemoryInBytes.get();
+        long totalIfAllocated = totalAllocated + spaceAcquiredByPool;
+        if (totalIfAllocated > maxBufferPoolMemoryLimit) {
+            LOGGER.error(
+                    "Cannot allocate more buffers, memory limit reached. "
+                            + "totalAllocatedMemoryInBytes={}, 
maxBufferPoolMemoryLimit={}, columnBufferPoolMaxSize={}, 
columnBufferInBytes={}, "
+                            + "columnBufferPoolMemoryPercentage={}, 
totalIfAllocated={}, numAllocNew={}, availableBuffers={}. "
+                            + "Consider increasing 
storageColumnBufferPoolMemoryPercentage (current: {}).",
+                    totalAllocated, maxBufferPoolMemoryLimit, 
columnBufferPoolMaxSize, columnBufferInBytes,
+                    columnBufferPoolMemoryPercentage, totalIfAllocated, 
numAllocNew.get(),
+                    bufferSemaphore.availablePermits(), 
columnBufferPoolMemoryPercentage);
+            throw new IllegalStateException("Cannot allocate more buffers, 
maxBufferPoolMemoryLimit ("
+                    + maxBufferPoolMemoryLimit + ") reached.");
+        }
+    }
+
+    private long getMaxBufferPoolMemoryLimit(int columnBufferInBytes, double 
columnBufferPoolMemoryPercentage,
+            int columnBufferPoolMaxSize) {
+        long totalMemory = Runtime.getRuntime().totalMemory();
+        return (long) Math.max(totalMemory * (columnBufferPoolMemoryPercentage 
/ 100),
+                columnBufferInBytes * columnBufferPoolMaxSize);
+    }
+
+    @Override
+    public void close() {
+        bufferPool.clear();
+        totalPooledMemoryInBytes.set(0);
+        totalAllocatedMemoryInBytes.set(0);
+        LOGGER.info("ColumnBufferPool closed. numAllocNew={}", 
numAllocNew.get());
+    }
+
+    @Override
+    public void start() {
+        // No-op
+    }
+
+    @Override
+    public void dumpState(OutputStream os) throws IOException {
+        long pooledBytes = totalPooledMemoryInBytes.get();
+        long totalAllocatedBytes = totalAllocatedMemoryInBytes.get();
+        String buffer = "ColumnBufferPool State:\n" + 
"columnBufferPoolMaxSize: " + columnBufferPoolMaxSize + "\n"
+                + "Max Buffers: " + maxBuffers + "\n" + "Total Pooled Memory 
(bytes): " + pooledBytes + "\n"
+                + "Total Allocated Memory (bytes): " + totalAllocatedBytes + 
"\n" + "Number of Buffers Allocated New: "
+                + numAllocNew.get() + "\n" + "Available Buffers: " + 
bufferSemaphore.availablePermits();
+        os.write(buffer.getBytes());
+        LOGGER.info("dumpState called:\n{}", buffer);
+    }
+
+    @Override
+    public void stop(boolean dumpState, OutputStream ouputStream) throws 
IOException {
+        close();
+        if (dumpState && ouputStream != null) {
+            dumpState(ouputStream);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
similarity index 55%
copy from 
asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
copy to 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
index aa3cb71aaf..e47478f4b0 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/common/buffer/NoOpWriteMultiPageOp.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/FreeColumnBufferPool.java
@@ -16,41 +16,47 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.column.common.buffer;
+package org.apache.hyracks.storage.common.buffercache;
 
 import java.nio.ByteBuffer;
 
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.StorageUtil;
 
-public class NoOpWriteMultiPageOp implements IColumnWriteMultiPageOp {
-    public static final IColumnWriteMultiPageOp INSTANCE = new 
NoOpWriteMultiPageOp();
+public class FreeColumnBufferPool implements IColumnBufferPool {
+    private static final int INITIAL_BUFFER_SIZE = 
StorageUtil.getIntSizeInBytes(8, StorageUtil.StorageUnit.KILOBYTE);
+
+    @Override
+    public void reserve(int requestedBuffers) throws InterruptedException {
 
-    private NoOpWriteMultiPageOp() {
     }
 
     @Override
-    public ByteBuffer confiscatePersistent() throws HyracksDataException {
-        return null;
+    public boolean tryReserve(int requestedBuffers) {
+        return true;
     }
 
     @Override
-    public ByteBuffer confiscatePageZeroPersistent() throws 
HyracksDataException {
-        return null;
+    public void release(int buffers) {
+
     }
 
     @Override
-    public ByteBuffer confiscateTemporary() throws HyracksDataException {
-        return null;
+    public ByteBuffer getBuffer() {
+        return ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
     }
 
     @Override
-    public void persist() throws HyracksDataException {
+    public int getMaxReservedBuffers() {
+        return 0;
+    }
+
+    @Override
+    public void recycle(ByteBuffer buffer) {
 
     }
 
     @Override
-    public int getNumberOfPersistentBuffers() {
-        return 0;
+    public void close() {
+
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
new file mode 100644
index 0000000000..9fc3027799
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/buffercache/IColumnBufferPool.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common.buffercache;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Interface for a buffer-based pool managing column buffer allocation.
+ *
+ * The contract is:
+ * - Callers must reserve semaphore permits (blocking or non-blocking) before 
acquiring buffers.
+ * - Semaphore permits must be released when no longer needed.
+ * - Buffers are acquired via getBuffer() and returned via recycle().
+ * - The pool may block or fail to allocate if semaphore permits are exhausted.
+ */
+public interface IColumnBufferPool {
+
+    /**
+     * Reserve the specified number of buffers, blocking if necessary until 
available.
+     * @param requestedBuffers number of buffers to reserve
+     * @throws InterruptedException if interrupted while waiting
+     */
+    void reserve(int requestedBuffers) throws InterruptedException;
+
+    /**
+     * Attempt to reserve the specified number of buffers without blocking.
+     * @param requestedBuffers number of buffers to reserve
+     * @return true if buffers were reserved, false otherwise
+     */
+    boolean tryReserve(int requestedBuffers);
+
+    /**
+     * Release the specified number of previously reserved buffers.
+     * @param buffers number of buffers to release
+     */
+    void release(int buffers);
+
+    /**
+     * Acquire a buffer from the pool. Requires a reserved buffer.
+     * @return a ByteBuffer for use
+     */
+    ByteBuffer getBuffer();
+
+    /**
+     * Return a buffer to the pool for reuse.
+     * @param buffer the buffer to recycle
+     */
+    void recycle(ByteBuffer buffer);
+
+    /**
+     * Returns the maximum number of buffers that can be reserved from this 
pool.
+     * This represents the upper bound on concurrent buffer allocations.
+     *
+     * @return the maximum number of buffers available in the pool
+     */
+    int getMaxReservedBuffers();
+
+    /**
+     * Close the pool and release any resources.
+     */
+    void close();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
index 0b4d2ed72b..ead6dd2d0c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManager.java
@@ -26,6 +26,7 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.IStorageManager;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.disk.IDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.disk.NoOpDiskCacheMonitoringService;
 import org.apache.hyracks.storage.common.file.ResourceIdFactory;
@@ -47,6 +48,11 @@ public class TestStorageManager implements IStorageManager {
         return TestStorageManagerComponentHolder.getBufferCache(ctx);
     }
 
+    @Override
+    public IColumnBufferPool getColumnBufferPool(INCServiceContext ctx) {
+        return TestStorageManagerComponentHolder.getColumnBufferPool(ctx);
+    }
+
     @Override
     public ILocalResourceRepository 
getLocalResourceRepository(INCServiceContext ctx) {
         return TestStorageManagerComponentHolder.getLocalResourceRepository();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
index 7f696b4e95..5788823a94 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestStorageManagerComponentHolder.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,11 +40,13 @@ import 
org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.IResourceLifecycleManager;
 import org.apache.hyracks.storage.common.buffercache.BufferCache;
 import 
org.apache.hyracks.storage.common.buffercache.ClockPageReplacementStrategy;
+import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool;
 import 
org.apache.hyracks.storage.common.buffercache.DefaultDiskCachedPageAllocator;
 import org.apache.hyracks.storage.common.buffercache.DelayPageCleanerPolicy;
 import org.apache.hyracks.storage.common.buffercache.HeapBufferAllocator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
+import org.apache.hyracks.storage.common.buffercache.IColumnBufferPool;
 import org.apache.hyracks.storage.common.buffercache.IPageReplacementStrategy;
 import 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider;
 import org.apache.hyracks.storage.common.file.FileMapManager;
@@ -56,6 +59,7 @@ import 
org.apache.hyracks.storage.common.file.TransientLocalResourceRepositoryFa
 
 public class TestStorageManagerComponentHolder {
     private static IBufferCache bufferCache;
+    private static IColumnBufferPool columnBufferPool;
     private static IFileMapProvider fileMapProvider;
     private static IOManager ioManager;
     private static ILocalResourceRepository localResourceRepository;
@@ -76,6 +80,7 @@ public class TestStorageManagerComponentHolder {
         fileMapProvider = null;
         localResourceRepository = null;
         lcManager = null;
+        columnBufferPool = null;
     }
 
     public synchronized static IResourceLifecycleManager<IIndex> 
getIndexLifecycleManager() {
@@ -93,6 +98,13 @@ public class TestStorageManagerComponentHolder {
         return bufferCache;
     }
 
+    public synchronized static IColumnBufferPool 
getColumnBufferPool(INCServiceContext ctx) {
+        if (columnBufferPool == null) {
+            return getColumnBufferPool();
+        }
+        return columnBufferPool;
+    }
+
     private synchronized static IFileMapProvider getFileMapProvider() {
         if (fileMapProvider == null) {
             fileMapProvider = new FileMapManager();
@@ -142,6 +154,15 @@ public class TestStorageManagerComponentHolder {
         return resourceIdFactory;
     }
 
+    public static IColumnBufferPool getColumnBufferPool() {
+        if (columnBufferPool != null) {
+            return columnBufferPool;
+        }
+
+        columnBufferPool = new ColumnBufferPool(4 * 1024, 500, 3.0, 
TimeUnit.MINUTES.toMillis(2));
+        return columnBufferPool;
+    }
+
     public static IBufferCache getBufferCache(IIOManager ioManager) {
         if (bufferCache != null) {
             return bufferCache;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
new file mode 100644
index 0000000000..0428b48e06
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/ColumnBufferPoolTest.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.storage.common.buffercache.ColumnBufferPool;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ColumnBufferPoolTest {
+
+    private ColumnBufferPool columnBufferPool;
+    private static final int COLUMN_BUFFER_GRANULE_BYTES = 4096; // 4KB
+    private static final int POOL_SIZE = 10;
+    private static final double CAPPED_PERCENTAGE = 10.0; // 10% of runtime 
memory
+    private static final long RESERVE_TIMEOUT_MILLIS = 5000; // 5 seconds 
timeout
+
+    @Before
+    public void setUp() {
+        columnBufferPool =
+                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, RESERVE_TIMEOUT_MILLIS);
+    }
+
+    @After
+    public void tearDown() {
+        if (columnBufferPool != null) {
+            columnBufferPool.close();
+        }
+    }
+
+    @Test
+    public void testBasicCreditReservationAndRelease() throws 
InterruptedException {
+        // Reserve credits
+        columnBufferPool.reserve(5);
+        // Release credits
+        columnBufferPool.release(5);
+    }
+
+    @Test
+    public void testReserveExtraCreditsFor0thTuple() throws 
InterruptedException {
+        // Simulate scenario: initially reserve 3 credits, then need 8 total 
for 0th tuple
+        columnBufferPool.reserve(3);
+        // Need 5 more credits for 0th tuple (blocking call)
+        columnBufferPool.reserve(5); // This should block until credits are 
available
+        // Release all credits
+        columnBufferPool.release(8);
+    }
+
+    @Test
+    public void testTryAcquiringExtraCreditsForNon0thTuple() throws 
InterruptedException {
+        // Simulate scenario: initially have 2 credits, try to get 5 total 
(non-blocking)
+        columnBufferPool.reserve(2);
+        // Try to acquire 3 more credits for non-0th tuple (should succeed if 
available)
+        boolean success = columnBufferPool.tryReserve(3);
+        assertTrue("Should be able to reserve extra credits", success);
+        // Release all credits
+        columnBufferPool.release(5);
+    }
+
+    @Test
+    public void testTryReserveFailure() throws InterruptedException {
+        // Reserve most credits
+        int maxCredits = columnBufferPool.getMaxReservedBuffers();
+        columnBufferPool.reserve(maxCredits - 2);
+        // Try to reserve more than available
+        boolean success = columnBufferPool.tryReserve(5);
+        assertFalse("Should fail to reserve more credits than available", 
success);
+        // Release credits
+        columnBufferPool.release(maxCredits - 2);
+    }
+
+    @Test(expected = IllegalStateException.class, timeout = 10000)
+    public void testReserveTimeout() throws InterruptedException {
+        // Use a shorter timeout for this test
+        ColumnBufferPool shortTimeoutPool =
+                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, 100); // 100ms timeout
+        try {
+            // Reserve all credits
+            int maxCredits = shortTimeoutPool.getMaxReservedBuffers();
+            shortTimeoutPool.reserve(maxCredits);
+            // This should timeout and throw IllegalStateException
+            shortTimeoutPool.reserve(1);
+        } finally {
+            shortTimeoutPool.close();
+        }
+    }
+
+    @Test
+    public void testBufferAcquisitionAndRelease() throws InterruptedException {
+        // Reserve credits for buffers
+        columnBufferPool.reserve(3);
+        List<ByteBuffer> acquiredBuffers = new ArrayList<>();
+        // Acquire buffers
+        for (int i = 0; i < 3; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull("Buffer should not be null", buffer);
+            assertEquals("Buffer should have correct capacity", 
COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity());
+            acquiredBuffers.add(buffer);
+        }
+        // Release buffers
+        for (ByteBuffer buffer : acquiredBuffers) {
+            columnBufferPool.recycle(buffer);
+        }
+        // Release credits
+        columnBufferPool.release(3);
+    }
+
+    @Test
+    public void testBufferAcquisitionAndReleaseInBatches() throws 
InterruptedException {
+        // Step 1: Reserve credits upfront
+        columnBufferPool.reserve(6);
+
+        List<ByteBuffer> acquiredBuffers = new ArrayList<>();
+
+        // Step 2: Acquire buffers for first batch
+        for (int i = 0; i < 3; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull(buffer);
+            assertEquals(COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity());
+            acquiredBuffers.add(buffer);
+        }
+
+        // Step 3: Release buffers after first batch (flush happens)
+        for (ByteBuffer buffer : acquiredBuffers) {
+            columnBufferPool.recycle(buffer);
+        }
+        acquiredBuffers.clear();
+
+        // Step 4: Acquire buffers for second batch
+        for (int i = 0; i < 2; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull(buffer);
+            acquiredBuffers.add(buffer);
+        }
+
+        // Step 5: Release buffers after second batch
+        for (ByteBuffer buffer : acquiredBuffers) {
+            columnBufferPool.recycle(buffer);
+        }
+
+        // Step 6: Release remaining credits
+        columnBufferPool.release(6);
+    }
+
+    @Test
+    public void testCompleteThreadLifecycle() throws InterruptedException {
+        // Step 1: Reserve initial credits
+        columnBufferPool.reserve(4);
+
+        // Step 2: Reserve extra credits for 0th tuple
+        columnBufferPool.reserve(3); // Now have 7 total
+
+        // Step 3: Process multiple batches
+        for (int batch = 0; batch < 3; batch++) {
+            List<ByteBuffer> batchBuffers = new ArrayList<>();
+
+            // Acquire buffers for this batch
+            for (int i = 0; i < 2; i++) {
+                ByteBuffer buffer = columnBufferPool.getBuffer();
+                assertNotNull(buffer);
+                batchBuffers.add(buffer);
+            }
+
+            // Simulate processing
+            Thread.sleep(10);
+
+            // Release buffers after batch processing
+            for (ByteBuffer buffer : batchBuffers) {
+                columnBufferPool.recycle(buffer);
+            }
+        }
+
+        // Step 4: Release remaining credits
+        columnBufferPool.release(7);
+    }
+
+    @Test
+    public void testMultipleThreadsConcurrentExecution() throws 
InterruptedException {
+        // Use shorter timeout to make timeouts happen faster in test
+        ColumnBufferPool testPool =
+                new ColumnBufferPool(COLUMN_BUFFER_GRANULE_BYTES, POOL_SIZE, 
CAPPED_PERCENTAGE, 500); // 500ms timeout
+
+        // Helper for timestamped log
+        java.util.function.Consumer<String> logWithTimestamp = msg -> {
+            System.out.println("[" + java.time.Instant.now() + "] " + msg);
+        };
+
+        try {
+            int numThreads = 5; // More threads to increase contention
+            int maxCredits = testPool.getMaxReservedBuffers();
+            logWithTimestamp.accept("maxCredits: " + maxCredits);
+            int creditsPerThread = (maxCredits * 3) / 4; // Each thread wants 
75% of total credits - forces contention!
+
+            ExecutorService executor = 
Executors.newFixedThreadPool(numThreads);
+            CountDownLatch startLatch = new CountDownLatch(1);
+            CountDownLatch completionLatch = new CountDownLatch(numThreads);
+            AtomicInteger successfulThreads = new AtomicInteger(0);
+            AtomicInteger threadOrder = new AtomicInteger(0);
+            AtomicInteger totalRetries = new AtomicInteger(0);
+
+            for (int threadId = 0; threadId < numThreads; threadId++) {
+                final int id = threadId;
+                executor.submit(() -> {
+                    try {
+                        startLatch.await();
+                        int threadRetryCount = 0;
+                        boolean threadSuccessful = false;
+
+                        // Thread-level retry: retry the entire thread 
operation
+                        while (!threadSuccessful && threadRetryCount < 100) {
+                            int totalReservedCredits = 0; // Track credits 
reserved in this attempt
+                            try {
+                                if (threadRetryCount > 0) {
+                                    totalRetries.incrementAndGet();
+                                    logWithTimestamp
+                                            .accept("Thread " + id + " 
starting retry attempt #" + threadRetryCount);
+                                    // Exponential backoff with 
thread-specific variation before retrying
+                                    int backoffTime = 100 + (threadRetryCount 
* 50) + (id * 25); // Thread-specific backoff
+                                    Thread.sleep(backoffTime);
+                                }
+
+                                int myOrder = threadOrder.incrementAndGet();
+
+                                // Step 1: Reserve initial credits (no 
individual retry - fail fast)
+                                int initialCredits = creditsPerThread / 2;
+                                long startTime = System.currentTimeMillis();
+                                testPool.reserve(initialCredits);
+                                totalReservedCredits += initialCredits; // 
Track successful reservation
+                                long reserveTime = System.currentTimeMillis() 
- startTime;
+
+                                logWithTimestamp.accept("Thread " + id + " 
(order: " + myOrder
+                                        + ") reserved initial credits (" + 
initialCredits + " credits) in "
+                                        + reserveTime + "ms"
+                                        + (threadRetryCount > 0 ? " on thread 
retry #" + threadRetryCount : ""));
+
+                                // Step 2: Reserve extra credits for 0th tuple 
(no individual retry - fail fast)
+                                int extraCredits = creditsPerThread / 2;
+                                startTime = System.currentTimeMillis();
+                                testPool.reserve(extraCredits); // Now have 
creditsPerThread total
+                                totalReservedCredits += extraCredits; // Track 
successful reservation
+                                reserveTime = System.currentTimeMillis() - 
startTime;
+
+                                logWithTimestamp.accept("Thread " + id + " 
reserved extra credits (" + extraCredits
+                                        + " credits) in " + reserveTime + 
"ms");
+
+                                // Step 3: Hold credits for longer than 
timeout to force other threads to timeout
+                                // Only the first thread (or first few) will 
succeed initially
+                                // Varied sleep patterns per thread to create 
different contention scenarios
+                                int baseSleep = 600 + (id * 100); // Thread 0: 
600ms, Thread 1: 700ms, etc.
+                                Thread.sleep(baseSleep);
+
+                                // Step 4: Process some buffers while holding 
many credits
+                                List<ByteBuffer> buffers = new ArrayList<>();
+                                for (int i = 0; i < Math.min(3, 
creditsPerThread); i++) {
+                                    ByteBuffer buffer = testPool.getBuffer();
+                                    buffers.add(buffer);
+                                }
+
+                                // Simulate longer processing to ensure 
timeouts occur
+                                // Different processing times per thread to 
create realistic workload patterns
+                                int processingTime = 150 + (id * 50); // 
Thread 0: 150ms, Thread 1: 200ms, etc.
+                                Thread.sleep(processingTime);
+
+                                // Release buffers but keep credits reserved
+                                for (ByteBuffer buffer : buffers) {
+                                    testPool.recycle(buffer);
+                                }
+
+                                // Hold credits even longer to guarantee 
timeouts
+                                // Staggered final sleep to create cascading 
release pattern
+                                int finalHoldTime = 250 + (id * 75); // Thread 
0: 250ms, Thread 1: 325ms, etc.
+                                Thread.sleep(finalHoldTime);
+
+                                // Step 5: Finally release all credits 
(allowing other threads to proceed)
+                                testPool.release(creditsPerThread);
+                                logWithTimestamp.accept("Thread " + id
+                                        + " completed successfully and 
released all credits" + (threadRetryCount > 0
+                                                ? " after " + threadRetryCount 
+ " thread retries" : ""));
+
+                                threadSuccessful = true; // Mark thread as 
successful
+                                successfulThreads.incrementAndGet();
+
+                            } catch (IllegalStateException e) {
+                                // Timeout occurred - cleanup partial credits 
and retry entire thread operation
+                                threadRetryCount++;
+                                if (totalReservedCredits > 0) {
+                                    testPool.release(totalReservedCredits);
+                                    logWithTimestamp
+                                            .accept("Thread " + id + " 
timeout, released " + totalReservedCredits
+                                                    + " partial credits, will 
retry entire thread operation (attempt "
+                                                    + threadRetryCount + ")");
+                                } else {
+                                    logWithTimestamp.accept(
+                                            "Thread " + id + " timeout, will 
retry entire thread operation (attempt "
+                                                    + threadRetryCount + ")");
+                                }
+
+                            } catch (Exception e) {
+                                // Other exceptions - cleanup partial credits 
and retry entire thread operation
+                                threadRetryCount++;
+                                if (totalReservedCredits > 0) {
+                                    testPool.release(totalReservedCredits);
+                                    logWithTimestamp.accept("Thread " + id + " 
error: " + e.getMessage() + ", released "
+                                            + totalReservedCredits
+                                            + " partial credits, will retry 
entire thread operation (attempt "
+                                            + threadRetryCount + ")");
+                                } else {
+                                    logWithTimestamp.accept("Thread " + id + " 
error: " + e.getMessage()
+                                            + ", will retry entire thread 
operation (attempt " + threadRetryCount
+                                            + ")");
+                                }
+                            }
+                        }
+
+                        if (!threadSuccessful) {
+                            logWithTimestamp.accept(
+                                    "Thread " + id + " failed after " + 
threadRetryCount + " thread retry attempts");
+                        }
+
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    } finally {
+                        completionLatch.countDown();
+                    }
+                });
+            }
+
+            // Start all threads at once to maximize contention
+            startLatch.countDown();
+
+            // Wait for completion with much longer timeout since threads will 
be holding credits for 1+ seconds and retrying
+            assertTrue("All threads should complete", 
completionLatch.await(120, TimeUnit.SECONDS));
+            assertEquals("All threads should succeed", numThreads, 
successfulThreads.get());
+
+            // Verify that thread-level retries actually occurred (confirms 
timeout/retry mechanism)
+            int totalRetriesCount = totalRetries.get();
+            logWithTimestamp.accept("Total thread retries across all threads: 
" + totalRetriesCount);
+            assertTrue("Should have some thread retries due to contention", 
totalRetriesCount > 0);
+
+            executor.shutdown();
+            assertTrue("Executor should terminate", 
executor.awaitTermination(15, TimeUnit.SECONDS));
+
+        } finally {
+            testPool.close();
+        }
+    }
+
+    @Test
+    public void testPoolCapacity() throws InterruptedException {
+        // Reserve credits for buffer operations
+        columnBufferPool.reserve(POOL_SIZE);
+
+        List<ByteBuffer> acquiredBuffers = new ArrayList<>();
+
+        // Acquire buffers up to initial pool size (POOL_SIZE/2)
+        for (int i = 0; i < POOL_SIZE / 2; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull("Buffer " + i + " should not be null", buffer);
+            assertEquals("Buffer should have correct capacity", 
COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity());
+            acquiredBuffers.add(buffer);
+        }
+
+        // Acquire additional buffers (should allocate new ones)
+        int additionalBuffers = POOL_SIZE / 2;
+        for (int i = 0; i < additionalBuffers; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull("Additional buffer " + i + " should not be null", 
buffer);
+            assertEquals("Buffer should have correct capacity", 
COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity());
+            acquiredBuffers.add(buffer);
+        }
+
+        assertEquals("Total buffers should match reserved credits", POOL_SIZE, 
acquiredBuffers.size());
+
+        // Release all buffers
+        for (ByteBuffer buffer : acquiredBuffers) {
+            columnBufferPool.recycle(buffer);
+        }
+
+        // Verify pool can still acquire buffers after release
+        ByteBuffer testBuffer = columnBufferPool.getBuffer();
+        assertNotNull("Should be able to acquire buffer after release", 
testBuffer);
+        assertEquals("Buffer should have correct capacity", 
COLUMN_BUFFER_GRANULE_BYTES, testBuffer.capacity());
+
+        // Release the test buffer
+        columnBufferPool.recycle(testBuffer);
+
+        // Release credits
+        columnBufferPool.release(POOL_SIZE);
+    }
+
+    @Test
+    public void testBootstrapInitialization() throws InterruptedException {
+        // Verify the pool can acquire pre-initialized buffers
+        columnBufferPool.reserve(POOL_SIZE / 2);
+
+        // Acquire all pre-initialized buffers
+        List<ByteBuffer> buffers = new ArrayList<>();
+        for (int i = 0; i < POOL_SIZE / 2; i++) {
+            ByteBuffer buffer = columnBufferPool.getBuffer();
+            assertNotNull("Pre-initialized buffer " + i + " should be 
available", buffer);
+            assertEquals("Buffer should have correct capacity", 
COLUMN_BUFFER_GRANULE_BYTES, buffer.capacity());
+            buffers.add(buffer);
+        }
+
+        // Release buffers
+        for (ByteBuffer buffer : buffers) {
+            columnBufferPool.recycle(buffer);
+        }
+
+        // Release credits
+        columnBufferPool.release(POOL_SIZE / 2);
+    }
+
+    @Test
+    public void testCreditManagement() throws InterruptedException {
+        int maxCredits = columnBufferPool.getMaxReservedBuffers();
+        assertTrue("Max credits should be positive", maxCredits > 0);
+        // Reserve credits
+        columnBufferPool.reserve(5);
+        // Try to reserve more
+        boolean success = columnBufferPool.tryReserve(3);
+        assertTrue("Should be able to reserve more credits", success);
+        // Release all credits
+        columnBufferPool.release(8);
+    }
+
+    @Test
+    public void testBufferRecycling() throws InterruptedException {
+        // Reserve credits
+        columnBufferPool.reserve(2);
+        // Acquire buffer
+        ByteBuffer buffer1 = columnBufferPool.getBuffer();
+        assertNotNull(buffer1);
+        // Recycle buffer
+        columnBufferPool.recycle(buffer1);
+        // Acquire another buffer (should potentially reuse)
+        ByteBuffer buffer2 = columnBufferPool.getBuffer();
+        assertNotNull(buffer2);
+        // Recycle second buffer
+        columnBufferPool.recycle(buffer2);
+        // Release credits
+        columnBufferPool.release(2);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testMemoryQuotaEnforcement() throws InterruptedException {
+        // Create a small pool to test quota enforcement
+        ColumnBufferPool smallPool = new ColumnBufferPool(1024, 2, 0.001, 
1000); // Very small percentage
+        try {
+            int maxCredits = smallPool.getMaxReservedBuffers();
+            smallPool.reserve(maxCredits);
+            // Try to acquire more buffers than the quota allows
+            List<ByteBuffer> buffers = new ArrayList<>();
+            for (int i = 0; i < maxCredits + 10; i++) { // Try to exceed quota
+                ByteBuffer buffer = smallPool.getBuffer();
+                buffers.add(buffer);
+            }
+        } finally {
+            smallPool.close();
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testRecycleNullBuffer() {
+        columnBufferPool.recycle(null);
+    }
+
+    @Test
+    public void testEdgeCases() throws InterruptedException {
+        // Test zero credits
+        columnBufferPool.reserve(0); // Should be no-op
+        columnBufferPool.release(0); // Should be no-op
+        // Test negative credits
+        assertTrue("tryReserve with negative should return true", 
columnBufferPool.tryReserve(-1));
+        // Test max credits
+        int maxCredits = columnBufferPool.getMaxReservedBuffers();
+        assertTrue("Max credits should be positive", maxCredits > 0);
+    }
+
+    // Helper method to access maxCredits
+    private int getMaxCredits() {
+        return columnBufferPool.getMaxReservedBuffers();
+    }
+}

Reply via email to