This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 746e3f5fea [ASTERIXDB-3389][STO] Support caching/eviciting columns
746e3f5fea is described below

commit 746e3f5fea806db0bf219a2ba56554b1a2797bd4
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed May 8 21:58:37 2024 -0700

    [ASTERIXDB-3389][STO] Support caching/eviciting columns
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Prepare columnar indexes to support caching and
    evicting columns.
    
    Change-Id: Ib557608b0b25219ffc00a76325091a92f5e77698
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18260
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../AbstractColumnImmutableReadMetadata.java       |  12 +-
 .../operation/lsm/flush/BatchFinalizerVisitor.java |   5 +-
 .../flush/FlushColumnTupleReaderWriterFactory.java |   4 +-
 .../lsm/flush/FlushColumnTupleWithMetaWriter.java  |   5 +-
 .../lsm/flush/FlushColumnTupleWriter.java          |   6 +-
 .../load/LoadColumnTupleReaderWriterFactory.java   |   2 +-
 .../operation/lsm/load/LoadColumnTupleWriter.java  |   5 +-
 .../lsm/merge/MergeColumnReadMetadata.java         |   4 +-
 .../merge/MergeColumnTupleReaderWriterFactory.java |   3 +-
 .../lsm/merge/MergeColumnTupleWriter.java          |  12 +-
 .../create/PrimaryScanColumnTupleProjector.java    |   4 +-
 .../upsert/UpsertPreviousColumnTupleProjector.java |   4 +-
 .../operation/query/QueryColumnMetadata.java       |  11 +-
 .../operation/query/QueryColumnTupleProjector.java |   8 +-
 .../query/QueryColumnTupleProjectorFactory.java    |   4 +-
 .../query/QueryColumnWithMetaMetadata.java         |  13 +-
 .../query/QueryColumnWithMetaTupleProjector.java   |   6 +-
 .../asterix/column/values/IColumnBatchWriter.java  |  10 +-
 .../column/values/writer/ColumnBatchWriter.java    |  41 ++-
 .../column/test/bytes/AbstractBytesTest.java       |   3 +-
 .../asterix/column/test/bytes/FlushLargeTest.java  |  13 +-
 .../asterix/column/test/bytes/FlushSmallTest.java  |  13 +-
 .../asterix/column/test/dummy/AssemblerTest.java   |   3 +-
 .../values/writer/NoOpColumnBatchWriter.java       |  11 +-
 .../hyracks-storage-am-lsm-btree-column/pom.xml    |  10 +
 ...rojectionInfo.java => ColumnProjectorType.java} |  34 +--
 .../api/projection/IColumnProjectionInfo.java      |   5 +
 .../cloud/CloudColumnIndexDiskCacheManager.java    | 103 +++++++
 .../am/lsm/btree/column/cloud/ColumnRanges.java    | 316 +++++++++++++++++++++
 .../buffercache/read/CloudColumnReadContext.java   | 197 +++++++++++++
 .../buffercache/read/CloudMegaPageReadContext.java | 202 +++++++++++++
 .../buffercache/write/CloudColumnWriteContext.java | 152 ++++++++++
 .../column/cloud/sweep/ColumnSweepLockInfo.java    |  56 ++++
 .../column/cloud/sweep/ColumnSweepPlanner.java     | 260 +++++++++++++++++
 .../btree/column/cloud/sweep/ColumnSweeper.java    | 220 ++++++++++++++
 .../column/cloud/sweep/ColumnSweeperUtil.java      |  55 ++++
 .../lsm/btree/column/cloud/sweep/ISweepClock.java  |  24 ++
 .../cloud/sweep/SweepBufferCacheReadContext.java   |  68 +++++
 .../column/cloud/sweep/ColumnSweepPlannerTest.java | 139 +++++++++
 .../column/dummy/DummyColumnProjectionInfo.java    |  72 +++++
 .../am/lsm/btree/column/dummy/DummySweepClock.java |  34 +++
 .../am/lsm/common/util/LSMComponentIdUtils.java    |  12 +-
 42 files changed, 2059 insertions(+), 102 deletions(-)

diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
index 5ac38d7f19..b7c40bd18b 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -21,17 +21,27 @@ package org.apache.asterix.column.metadata;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.data.std.api.IValueReference;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 
 public abstract class AbstractColumnImmutableReadMetadata extends 
AbstractColumnImmutableMetadata
         implements IColumnProjectionInfo {
+    private final ColumnProjectorType projectorType;
+
     protected AbstractColumnImmutableReadMetadata(ARecordType datasetType, 
ARecordType metaType,
-            int numberOfPrimaryKeys, IValueReference serializedMetadata, int 
numberOfColumns) {
+            int numberOfPrimaryKeys, IValueReference serializedMetadata, int 
numberOfColumns,
+            ColumnProjectorType projectorType) {
         super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, 
numberOfColumns);
+        this.projectorType = projectorType;
     }
 
     /**
      * @return the corresponding reader (merge reader or query reader) given 
<code>this</code> metadata
      */
     public abstract AbstractColumnTupleReader createTupleReader();
+
+    @Override
+    public final ColumnProjectorType getProjectorType() {
+        return projectorType;
+    }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
index 4cbe09bc9c..951a9fe7e6 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -59,9 +59,8 @@ public final class BatchFinalizerVisitor implements 
ISchemaNodeVisitor<Void, Abs
             columnMetadata.getMetaRoot().accept(this, null);
         }
 
-        int allocatedSpace = 
batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
-        allocatedSpace += batchWriter.writeColumns(orderedColumns);
-        return allocatedSpace;
+        batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
+        return batchWriter.writeColumns(orderedColumns);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
index f597e4f694..c64b074e2e 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -41,10 +41,10 @@ public class FlushColumnTupleReaderWriterFactory extends 
AbstractColumnTupleRead
         if (flushColumnMetadata.getMetaType() == null) {
             //no meta
             return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, 
maxNumberOfTuples, tolerance,
-                    maxLeafNodeSize);
+                    maxLeafNodeSize, writeContext);
         }
         return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, 
pageSize, maxNumberOfTuples, tolerance,
-                maxLeafNodeSize);
+                maxLeafNodeSize, writeContext);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
index b51b3953f7..a1abf5e8a8 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -21,6 +21,7 @@ package org.apache.asterix.column.operation.lsm.flush;
 import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
 import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter {
@@ -28,8 +29,8 @@ public class FlushColumnTupleWithMetaWriter extends 
FlushColumnTupleWriter {
     private final RecordLazyVisitablePointable metaPointable;
 
     public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, 
int pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, 
maxLeafNodeSize);
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext 
writeContext) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, 
maxLeafNodeSize, writeContext);
         metaColumnTransformer = new ColumnTransformer(columnMetadata, 
columnMetadata.getMetaRoot());
         metaPointable = new 
TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
     }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 41cad4932c..65f5eb44cf 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
 
 public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
@@ -45,11 +46,11 @@ public class FlushColumnTupleWriter extends 
AbstractColumnTupleWriter {
     protected int primaryKeysEstimatedSize;
 
     public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int 
pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext 
writeContext) {
         this.columnMetadata = columnMetadata;
         transformer = new ColumnTransformer(columnMetadata, 
columnMetadata.getRoot());
         finalizer = new BatchFinalizerVisitor(columnMetadata);
-        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), 
pageSize, tolerance);
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), 
pageSize, tolerance, writeContext);
         this.maxNumberOfTuples = maxNumberOfTuples;
         this.maxLeafNodeSize = maxLeafNodeSize;
         pointable = new 
TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
@@ -105,6 +106,7 @@ public class FlushColumnTupleWriter extends 
AbstractColumnTupleWriter {
     @Override
     public final void close() {
         columnMetadata.close();
+        writer.close();
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index 85569f90cc..531502efab 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -40,6 +40,6 @@ public class LoadColumnTupleReaderWriterFactory extends 
FlushColumnTupleReaderWr
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata 
columnMetadata,
             IColumnWriteContext writeContext) {
         return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, 
pageSize, maxNumberOfTuples, tolerance,
-                maxLeafNodeSize, MultiComparator.create(cmpFactories));
+                maxLeafNodeSize, MultiComparator.create(cmpFactories), 
writeContext);
     }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index ca14000d45..0ba49cf9bf 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
@@ -32,8 +33,8 @@ public class LoadColumnTupleWriter extends 
FlushColumnTupleWriter {
     private final MultiComparator comparator;
 
     public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int 
pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize, MultiComparator comparator) 
{
-        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, 
maxLeafNodeSize);
+            double tolerance, int maxLeafNodeSize, MultiComparator comparator, 
IColumnWriteContext writeContext) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance, 
maxLeafNodeSize, writeContext);
         prevTupleKeys =
                 
PointableTupleReference.create(columnMetadata.getNumberOfPrimaryKeys(), 
ArrayBackedValueStorage::new);
         this.comparator = comparator;
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
index a1eff69c4b..0152f5a908 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.merge;
 
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -43,7 +45,7 @@ public final class MergeColumnReadMetadata extends 
AbstractColumnImmutableReadMe
 
     private MergeColumnReadMetadata(ARecordType datasetType, ARecordType 
metaType, int numberOfPrimaryKeys,
             IColumnValuesReader[] columnReaders, IValueReference 
serializedMetadata) {
-        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, 
columnReaders.length);
+        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, 
columnReaders.length, MERGE);
         this.columnReaders = columnReaders;
     }
 
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
index d792855792..3f98c5fb62 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -38,7 +38,8 @@ public class MergeColumnTupleReaderWriterFactory extends 
AbstractColumnTupleRead
     public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata 
columnMetadata,
             IColumnWriteContext writeContext) {
         MergeColumnWriteMetadata mergeWriteMetadata = 
(MergeColumnWriteMetadata) columnMetadata;
-        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, 
maxNumberOfTuples, tolerance, maxLeafNodeSize);
+        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, 
maxNumberOfTuples, tolerance, maxLeafNodeSize,
+                writeContext);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index d3c102a6c4..5912a3bf8c 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -34,6 +34,7 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,7 +53,7 @@ public class MergeColumnTupleWriter extends 
AbstractColumnTupleWriter {
     private int numberOfAntiMatter;
 
     public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int 
pageSize, int maxNumberOfTuples,
-            double tolerance, int maxLeafNodeSize) {
+            double tolerance, int maxLeafNodeSize, IColumnWriteContext 
writeContext) {
         this.columnMetadata = columnMetadata;
         this.maxLeafNodeSize = maxLeafNodeSize;
         List<IColumnTupleIterator> componentsTuplesList = 
columnMetadata.getComponentsTuples();
@@ -68,7 +69,7 @@ public class MergeColumnTupleWriter extends 
AbstractColumnTupleWriter {
         }
         this.maxNumberOfTuples = getMaxNumberOfTuples(maxNumberOfTuples, 
totalNumberOfTuples, totalLength);
         this.writtenComponents = new RunLengthIntArray();
-        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), 
pageSize, tolerance);
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), 
pageSize, tolerance, writeContext);
         writtenComponents.reset();
         primaryKeyWriters = new 
IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
         for (int i = 0; i < primaryKeyWriters.length; i++) {
@@ -142,16 +143,17 @@ public class MergeColumnTupleWriter extends 
AbstractColumnTupleWriter {
             orderedColumns.add(columnMetadata.getWriter(i));
         }
         writer.setPageZeroBuffer(pageZero, numberOfColumns, 
numberOfPrimaryKeys);
-        int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters);
-        allocatedSpace += writer.writeColumns(orderedColumns);
+        writer.writePrimaryKeyColumns(primaryKeyWriters);
+        int totalLength = writer.writeColumns(orderedColumns);
 
         numberOfAntiMatter = 0;
-        return allocatedSpace;
+        return totalLength;
     }
 
     @Override
     public void close() {
         columnMetadata.close();
+        writer.close();
     }
 
     private void writePrimaryKeys(MergeColumnTupleReference columnTuple) 
throws HyracksDataException {
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
index c86318e1fc..57ad0ac6a4 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.secondary.create;
 
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
@@ -40,7 +42,7 @@ final class PrimaryScanColumnTupleProjector implements 
IColumnTupleProjector {
             ARecordType requestedType) {
         projector = new QueryColumnTupleProjector(datasetType, 
numberOfPrimaryKeys, requestedType,
                 Collections.emptyMap(), 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null);
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null, MODIFY);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
index 34b029186d..2b95c1a622 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.lsm.secondary.upsert;
 
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Collections;
@@ -42,7 +44,7 @@ final class UpsertPreviousColumnTupleProjector implements 
IColumnTupleProjector
         builder = new ArrayTupleBuilder(numberOfPrimaryKeys + 1);
         projector = new QueryColumnTupleProjector(datasetType, 
numberOfPrimaryKeys, requestedType,
                 Collections.emptyMap(), 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null);
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null, MODIFY);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index d50af3808c..a697e0d7e7 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -55,6 +55,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.apache.hyracks.util.LogRedactionUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -78,9 +79,9 @@ public class QueryColumnMetadata extends 
AbstractColumnImmutableReadMetadata {
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
IColumnValuesReaderFactory readerFactory,
             IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator 
normalizedFilterEvaluator,
             List<IColumnRangeFilterValueAccessor> filterValueAccessors,
-            IColumnIterableFilterEvaluator columnFilterEvaluator, 
List<IColumnValuesReader> filterColumnReaders)
-            throws HyracksDataException {
-        super(datasetType, metaType, primaryKeyReaders.length, 
serializedMetadata, -1);
+            IColumnIterableFilterEvaluator columnFilterEvaluator, 
List<IColumnValuesReader> filterColumnReaders,
+            ColumnProjectorType projectorType) throws HyracksDataException {
+        super(datasetType, metaType, primaryKeyReaders.length, 
serializedMetadata, -1, projectorType);
         this.fieldNamesDictionary = fieldNamesDictionary;
         this.primaryKeyReaders = primaryKeyReaders;
         this.normalizedFilterEvaluator = normalizedFilterEvaluator;
@@ -175,7 +176,7 @@ public class QueryColumnMetadata extends 
AbstractColumnImmutableReadMetadata {
             Map<String, FunctionCallInformation> functionCallInfoMap,
             IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory 
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) throws IOException {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) 
throws IOException {
         byte[] bytes = serializedMetadata.getByteArray();
         int offset = serializedMetadata.getStartOffset();
         int length = serializedMetadata.getLength();
@@ -230,7 +231,7 @@ public class QueryColumnMetadata extends 
AbstractColumnImmutableReadMetadata {
 
         return new QueryColumnMetadata(datasetType, null, primaryKeyReaders, 
serializedMetadata, fieldNamesDictionary,
                 clippedRoot, readerFactory, valueGetterFactory, 
normalizedFilterEvaluator, filterValueAccessors,
-                columnFilterEvaluator, filterColumnReaders);
+                columnFilterEvaluator, filterColumnReaders, projectorType);
     }
 
     protected static ObjectSchemaNode clip(ARecordType requestedType, 
ObjectSchemaNode root,
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
index 369a891aa7..d17cfbd241 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
 
@@ -47,6 +48,7 @@ public class QueryColumnTupleProjector implements 
IColumnTupleProjector {
     protected final Map<String, FunctionCallInformation> functionCallInfoMap;
     protected final IWarningCollector warningCollector;
     protected final IHyracksTaskContext context;
+    protected final ColumnProjectorType projectorType;
     protected final IColumnRangeFilterEvaluatorFactory 
normalizedFilterEvaluatorFactory;
     protected final IColumnIterableFilterEvaluatorFactory 
columnFilterEvaluatorFactory;
     private final AssembledTupleReference assembledTupleReference;
@@ -55,7 +57,7 @@ public class QueryColumnTupleProjector implements 
IColumnTupleProjector {
             Map<String, FunctionCallInformation> functionCallInfoMap,
             IColumnRangeFilterEvaluatorFactory 
normalizedFilterEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory 
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) {
         this.datasetType = datasetType;
         this.numberOfPrimaryKeys = numberOfPrimaryKeys;
         this.requestedType = requestedType;
@@ -64,6 +66,7 @@ public class QueryColumnTupleProjector implements 
IColumnTupleProjector {
         this.columnFilterEvaluatorFactory = columnFilterEvaluatorFactory;
         this.warningCollector = warningCollector;
         this.context = context;
+        this.projectorType = projectorType;
         assembledTupleReference = new 
AssembledTupleReference(getNumberOfTupleFields());
     }
 
@@ -72,7 +75,8 @@ public class QueryColumnTupleProjector implements 
IColumnTupleProjector {
         try {
             return QueryColumnMetadata.create(datasetType, 
numberOfPrimaryKeys, serializedMetadata,
                     new ColumnValueReaderFactory(), 
ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
-                    normalizedFilterEvaluatorFactory, 
columnFilterEvaluatorFactory, warningCollector, context);
+                    normalizedFilterEvaluatorFactory, 
columnFilterEvaluatorFactory, warningCollector, context,
+                    projectorType);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
index 0265a050cd..e333405e77 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.query;
 
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
 import java.util.Map;
 
 import 
org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluatorFactory;
@@ -61,7 +63,7 @@ public class QueryColumnTupleProjectorFactory implements 
ITupleProjectorFactory
         if (requestedMetaType == null) {
             // The dataset does not contain a meta part
             return new QueryColumnTupleProjector(datasetType, 
numberOfPrimaryKeys, requestedType, functionCallInfo,
-                    rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory, 
warningCollector, context);
+                    rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory, 
warningCollector, context, QUERY);
         }
         // The dataset has a meta part
         return new QueryColumnWithMetaTupleProjector(datasetType, metaType, 
numberOfPrimaryKeys, requestedType,
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 34e40ffa3d..8b23c059b0 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -51,6 +51,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 
 /**
  * Query column metadata (with metaRecord)
@@ -63,10 +64,11 @@ public final class QueryColumnWithMetaMetadata extends 
QueryColumnMetadata {
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, 
ObjectSchemaNode metaRoot,
             IColumnValuesReaderFactory readerFactory, IValueGetterFactory 
valueGetterFactory,
             IColumnFilterEvaluator filterEvaluator, 
List<IColumnRangeFilterValueAccessor> filterValueAccessors,
-            IColumnIterableFilterEvaluator columnFilterEvaluator, 
List<IColumnValuesReader> filterColumnReaders)
-            throws HyracksDataException {
+            IColumnIterableFilterEvaluator columnFilterEvaluator, 
List<IColumnValuesReader> filterColumnReaders,
+            ColumnProjectorType projectorType) throws HyracksDataException {
         super(datasetType, metaType, primaryKeyReaders, serializedMetadata, 
fieldNamesDictionary, root, readerFactory,
-                valueGetterFactory, filterEvaluator, filterValueAccessors, 
columnFilterEvaluator, filterColumnReaders);
+                valueGetterFactory, filterEvaluator, filterValueAccessors, 
columnFilterEvaluator, filterColumnReaders,
+                projectorType);
         metaAssembler = new ColumnAssembler(metaRoot, metaType, this, 
readerFactory, valueGetterFactory);
     }
 
@@ -121,7 +123,7 @@ public final class QueryColumnWithMetaMetadata extends 
QueryColumnMetadata {
             Map<String, FunctionCallInformation> functionCallInfo, ARecordType 
metaRequestedType,
             IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
             IColumnIterableFilterEvaluatorFactory 
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
-            IHyracksTaskContext context) throws IOException {
+            IHyracksTaskContext context, ColumnProjectorType projectorType) 
throws IOException {
         byte[] bytes = serializedMetadata.getByteArray();
         int offset = serializedMetadata.getStartOffset();
         int length = serializedMetadata.getLength();
@@ -179,6 +181,7 @@ public final class QueryColumnWithMetaMetadata extends 
QueryColumnMetadata {
 
         return new QueryColumnWithMetaMetadata(datasetType, metaType, 
primaryKeyReaders, serializedMetadata,
                 fieldNamesDictionary, clippedRoot, metaClippedRoot, 
readerFactory, valueGetterFactory,
-                normalizedFilterEvaluator, filterValueAccessors, 
columnFilterEvaluator, filterColumnReaders);
+                normalizedFilterEvaluator, filterValueAccessors, 
columnFilterEvaluator, filterColumnReaders,
+                projectorType);
     }
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
index f31c7ccd54..74524fe55a 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.column.operation.query;
 
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Map;
@@ -47,7 +49,7 @@ public class QueryColumnWithMetaTupleProjector extends 
QueryColumnTupleProjector
             IColumnIterableFilterEvaluatorFactory 
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
             IHyracksTaskContext context) {
         super(datasetType, numberOfPrimaryKeys, requestedType, 
functionCallInfoMap, filterEvaluator,
-                columnFilterEvaluatorFactory, warningCollector, context);
+                columnFilterEvaluatorFactory, warningCollector, context, 
QUERY);
         this.metaType = metaType;
         this.requestedMetaType = requestedMetaType;
     }
@@ -58,7 +60,7 @@ public class QueryColumnWithMetaTupleProjector extends 
QueryColumnTupleProjector
             return QueryColumnWithMetaMetadata.create(datasetType, metaType, 
numberOfPrimaryKeys, serializedMetadata,
                     new ColumnValueReaderFactory(), 
ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
                     requestedMetaType, normalizedFilterEvaluatorFactory, 
columnFilterEvaluatorFactory, warningCollector,
-                    context);
+                    context, projectorType);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index fc1173fcec..063743d8ae 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -30,15 +30,19 @@ public interface IColumnBatchWriter {
      * Writes the primary keys' values to Page0
      *
      * @param primaryKeyWriters primary keys' writers
-     * @return the allocated space for the primary keys' writers
      */
-    int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws 
HyracksDataException;
+    void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) 
throws HyracksDataException;
 
     /**
      * Writes the non-key values to multiple pages
      *
      * @param nonKeysColumnWriters non-key values' writers
-     * @return the allocated space for the non-key values' writers
+     * @return length of all columns (that includes pageZero)
      */
     int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) 
throws HyracksDataException;
+
+    /**
+     * Close the writer
+     */
+    void close();
 }
diff --git 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index 6fbdc27016..1d8f6847a4 100644
--- 
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++ 
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -31,6 +31,7 @@ import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
 
 /**
  * A writer for a batch columns' values
@@ -40,17 +41,19 @@ public final class ColumnBatchWriter implements 
IColumnBatchWriter {
     private final MultiPersistentBufferBytesOutputStream columns;
     private final int pageSize;
     private final double tolerance;
+    private final IColumnWriteContext writeContext;
     private final IReservedPointer columnLengthPointer;
-
     private ByteBuffer pageZero;
     private int columnsOffset;
     private int filtersOffset;
     private int primaryKeysOffset;
     private int nonKeyColumnStartOffset;
 
-    public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
int pageSize, double tolerance) {
+    public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, 
int pageSize, double tolerance,
+            IColumnWriteContext writeContext) {
         this.pageSize = pageSize;
         this.tolerance = tolerance;
+        this.writeContext = writeContext;
         primaryKeys = new ByteBufferOutputStream();
         columns = new MultiPersistentBufferBytesOutputStream(multiPageOpRef);
         columnLengthPointer = columns.createPointer();
@@ -74,39 +77,48 @@ public final class ColumnBatchWriter implements 
IColumnBatchWriter {
     }
 
     @Override
-    public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) 
throws HyracksDataException {
-        int allocatedSpace = 0;
+    public void writePrimaryKeyColumns(IColumnValuesWriter[] 
primaryKeyWriters) throws HyracksDataException {
         for (int i = 0; i < primaryKeyWriters.length; i++) {
             IColumnValuesWriter writer = primaryKeyWriters[i];
             setColumnOffset(i, primaryKeysOffset + primaryKeys.size());
             writer.flush(primaryKeys);
-            allocatedSpace += writer.getAllocatedSpace();
         }
-        return allocatedSpace;
     }
 
     @Override
     public int writeColumns(PriorityQueue<IColumnValuesWriter> 
nonKeysColumnWriters) throws HyracksDataException {
-        int allocatedSpace = 0;
         columns.reset();
         while (!nonKeysColumnWriters.isEmpty()) {
             IColumnValuesWriter writer = nonKeysColumnWriters.poll();
             writeColumn(writer);
-            allocatedSpace += writer.getAllocatedSpace();
         }
-        return allocatedSpace;
+
+        // compute the final length
+        int totalLength = nonKeyColumnStartOffset + columns.size();
+        // reset to ensure the last buffer's position and limit are set 
appropriately
+        columns.reset();
+        return totalLength;
+    }
+
+    @Override
+    public void close() {
+        writeContext.close();
     }
 
     private void writeColumn(IColumnValuesWriter writer) throws 
HyracksDataException {
+        boolean overlapping = true;
         if (!hasEnoughSpace(columns.getCurrentBufferPosition(), writer)) {
             /*
              * We reset the columns stream to write all pages and confiscate a 
new buffer to minimize splitting
              * the columns value into multiple pages.
              */
+            overlapping = false;
             nonKeyColumnStartOffset += columns.capacity();
             columns.reset();
         }
 
+        int columnIndex = writer.getColumnIndex();
+        writeContext.startWritingColumn(columnIndex, overlapping);
         int columnRelativeOffset = columns.size();
         columns.reserveInteger(columnLengthPointer);
         setColumnOffset(writer.getColumnIndex(), nonKeyColumnStartOffset + 
columnRelativeOffset);
@@ -116,10 +128,19 @@ public final class ColumnBatchWriter implements 
IColumnBatchWriter {
 
         int length = columns.size() - columnRelativeOffset;
         columnLengthPointer.setInteger(length);
+        writeContext.endWritingColumn(columnIndex, length);
     }
 
     private boolean hasEnoughSpace(int bufferPosition, IColumnValuesWriter 
columnWriter) {
-        //Estimated size mostly overestimate the size
+        if (bufferPosition == 0) {
+            // if the current buffer is empty, then use it
+            return true;
+        } else if (tolerance == 1.0d) {
+            // if tolerance is 100%, then it should avoid doing any 
calculations and start a with a new page
+            return false;
+        }
+
+        // Estimated size mostly overestimate the size
         int columnSize = columnWriter.getEstimatedSize();
         float remainingPercentage = (pageSize - bufferPosition) / (float) 
pageSize;
         if (columnSize > pageSize) {
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index c4f8727b89..55097c5d3e 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -62,6 +62,7 @@ import 
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
 import 
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.DefaultColumnWriteContext;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -151,7 +152,7 @@ public abstract class AbstractBytesTest extends TestBase {
             int numberOfTuplesToWrite) throws IOException {
         IColumnWriteMultiPageOp multiPageOp = 
columnMetadata.getMultiPageOpRef().getValue();
         FlushColumnTupleWriter writer = new 
FlushColumnTupleWriter(columnMetadata, PAGE_SIZE, MAX_NUMBER_OF_TUPLES,
-                TOLERANCE, MAX_LEAF_NODE_SIZE);
+                TOLERANCE, MAX_LEAF_NODE_SIZE, 
DefaultColumnWriteContext.INSTANCE);
 
         try {
             return writeTuples(fileId, writer, records, numberOfTuplesToWrite, 
multiPageOp);
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
index 6a0256c353..296a09f67b 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
@@ -34,6 +34,7 @@ import 
org.apache.asterix.common.exceptions.NoOpWarningCollector;
 import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,12 +64,12 @@ public class FlushLargeTest extends AbstractBytesTest {
         FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
         List<IValueReference> record = getParsedRecords();
         List<DummyPage> pageZeros = transform(fileId, columnMetadata, record, 
numberOfTuplesToWrite);
-        QueryColumnMetadata readMetadata =
-                QueryColumnMetadata.create(columnMetadata.getDatasetType(), 
columnMetadata.getNumberOfPrimaryKeys(),
-                        columnMetadata.serializeColumnsMetadata(), new 
ColumnValueReaderFactory(),
-                        ValueGetterFactory.INSTANCE, 
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
-                        Collections.emptyMap(), 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                        NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null);
+        QueryColumnMetadata readMetadata = 
QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+                columnMetadata.getNumberOfPrimaryKeys(), 
columnMetadata.serializeColumnsMetadata(),
+                new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+                ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, 
Collections.emptyMap(),
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
+                NoOpWarningCollector.INSTANCE, null, 
ColumnProjectorType.QUERY);
         writeResult(fileId, readMetadata, pageZeros);
         testCase.compareRepeated(numberOfTuplesToWrite);
     }
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
index 8b45142c78..463b89f8e1 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
@@ -34,6 +34,7 @@ import 
org.apache.asterix.common.exceptions.NoOpWarningCollector;
 import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -62,12 +63,12 @@ public class FlushSmallTest extends AbstractBytesTest {
         FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
         List<IValueReference> record = getParsedRecords();
         List<DummyPage> pageZeros = transform(fileId, columnMetadata, record, 
record.size());
-        QueryColumnMetadata readMetadata =
-                QueryColumnMetadata.create(columnMetadata.getDatasetType(), 
columnMetadata.getNumberOfPrimaryKeys(),
-                        columnMetadata.serializeColumnsMetadata(), new 
ColumnValueReaderFactory(),
-                        ValueGetterFactory.INSTANCE, 
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
-                        Collections.emptyMap(), 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                        NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpWarningCollector.INSTANCE, null);
+        QueryColumnMetadata readMetadata = 
QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+                columnMetadata.getNumberOfPrimaryKeys(), 
columnMetadata.serializeColumnsMetadata(),
+                new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+                ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, 
Collections.emptyMap(),
+                NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
+                NoOpWarningCollector.INSTANCE, null, 
ColumnProjectorType.QUERY);
         writeResult(fileId, readMetadata, pageZeros);
         testCase.compare();
     }
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 80106dcebe..ab06484a2d 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -49,6 +49,7 @@ import 
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IValueReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -99,7 +100,7 @@ public class AssemblerTest extends AbstractDummyTest {
                 columnMetadata.getNumberOfPrimaryKeys(), 
columnMetadata.serializeColumnsMetadata(), readerFactory,
                 DummyValueGetterFactory.INSTANCE, 
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, Collections.emptyMap(),
                 NoOpColumnFilterEvaluatorFactory.INSTANCE, 
NoOpColumnFilterEvaluatorFactory.INSTANCE,
-                NoOpWarningCollector.INSTANCE, null);
+                NoOpWarningCollector.INSTANCE, null, 
ColumnProjectorType.QUERY);
         AbstractBytesInputStream[] streams = new 
AbstractBytesInputStream[columnMetadata.getNumberOfColumns()];
         Arrays.fill(streams, DummyBytesInputStream.INSTANCE);
 
diff --git 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index eab824aa16..234f804a98 100644
--- 
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++ 
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -33,16 +33,21 @@ public class NoOpColumnBatchWriter implements 
IColumnBatchWriter {
 
     @Override
     public void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int 
numberOfColumns, int numberOfPrimaryKeys) {
-
+        // NoOp
     }
 
     @Override
-    public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) 
throws HyracksDataException {
-        return 0;
+    public void writePrimaryKeyColumns(IColumnValuesWriter[] 
primaryKeyWriters) throws HyracksDataException {
+        // NoOp
     }
 
     @Override
     public int writeColumns(PriorityQueue<IColumnValuesWriter> 
nonKeysColumnWriters) throws HyracksDataException {
         return 0;
     }
+
+    @Override
+    public void close() {
+        // NoOp
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 43aa8730dd..bbd8d87eda 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -92,6 +92,11 @@
       <artifactId>hyracks-control-nc</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-cloud</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
@@ -104,5 +109,10 @@
       <groupId>it.unimi.dsi</groupId>
       <artifactId>fastutil-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
similarity index 53%
copy from 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
copy to 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
index 641f704f5e..29258b5b70 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
@@ -18,34 +18,8 @@
  */
 package org.apache.hyracks.storage.am.lsm.btree.column.api.projection;
 
-/**
- * Gets information about the requested columns
- */
-public interface IColumnProjectionInfo {
-    /**
-     * @param ordinal position of the requested column
-     * @return column index given the ordinal number of the requested column
-     */
-    int getColumnIndex(int ordinal);
-
-    /**
-     * @return total number of requested columns
-     */
-    int getNumberOfProjectedColumns();
-
-    /**
-     * @return number of primary keys
-     */
-    int getNumberOfPrimaryKeys();
-
-    /**
-     * @param ordinal position of the filtered column
-     * @return column index given the ordinal number of the filtered column
-     */
-    int getFilteredColumnIndex(int ordinal);
-
-    /**
-     * @return number of filtered columns
-     */
-    int getNumberOfFilteredColumns();
+public enum ColumnProjectorType {
+    MERGE,
+    QUERY,
+    MODIFY
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
index 641f704f5e..b1bfe87546 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
@@ -48,4 +48,9 @@ public interface IColumnProjectionInfo {
      * @return number of filtered columns
      */
     int getNumberOfFilteredColumns();
+
+    /**
+     * @return the type of {@link IColumnTupleProjector} that created this 
projection info
+     */
+    ColumnProjectorType getProjectorType();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000000..38a1713eeb
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.CloudColumnWriteContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * The disk manager cannot be shared among different partitions as columns are 
local to each partition.
+ * For example, column 9 in partition 0 corresponds to "salary" while column 9 
in partition 1 corresponds to "age".
+ */
+public final class CloudColumnIndexDiskCacheManager implements 
IColumnIndexDiskCacheManager {
+    private final IColumnTupleProjector sweepProjector;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final ColumnSweeper sweeper;
+
+    public CloudColumnIndexDiskCacheManager(int numberOfPrimaryKeys, 
IColumnTupleProjector sweepProjector,
+            IPhysicalDrive drive) {
+        this.sweepProjector = sweepProjector;
+        this.drive = drive;
+        planner = new ColumnSweepPlanner(numberOfPrimaryKeys, 
System::nanoTime);
+        sweeper = new ColumnSweeper(numberOfPrimaryKeys);
+    }
+
+    @Override
+    public void activate(int numberOfColumns, List<ILSMDiskComponent> 
diskComponents, IBufferCache bufferCache)
+            throws HyracksDataException {
+        planner.onActivate(numberOfColumns, diskComponents, sweepProjector, 
bufferCache);
+    }
+
+    @Override
+    public IColumnWriteContext createWriteContext(int numberOfColumns, 
LSMIOOperationType operationType) {
+        return new CloudColumnWriteContext(drive, planner, numberOfColumns);
+    }
+
+    @Override
+    public IColumnReadContext createReadContext(IColumnProjectionInfo 
projectionInfo) {
+        ColumnProjectorType projectorType = projectionInfo.getProjectorType();
+        if (projectorType == ColumnProjectorType.QUERY) {
+            planner.access(projectionInfo, drive.hasSpace());
+        } else if (projectorType == ColumnProjectorType.MODIFY) {
+            planner.setIndexedColumns(projectionInfo);
+            // Requested (and indexed) columns will be persisted if space 
permits
+            return DefaultColumnReadContext.INSTANCE;
+        }
+        return new CloudColumnReadContext(projectionInfo, drive, 
planner.getPlanCopy());
+    }
+
+    @Override
+    public boolean isActive() {
+        return planner.isActive();
+    }
+
+    @Override
+    public boolean isSweepable() {
+        return true;
+    }
+
+    @Override
+    public boolean prepareSweepPlan() {
+        return planner.plan();
+    }
+
+    @Override
+    public long sweep(ISweepContext context) throws HyracksDataException {
+        SweepContext sweepContext = (SweepContext) context;
+        return sweeper.sweep(planner.getPlanCopy(), sweepContext, 
sweepProjector);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
new file mode 100644
index 0000000000..9fc60fa1a5
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+
+import java.util.BitSet;
+
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+/**
+ * Computes columns offsets, lengths, and pages
+ */
+public final class ColumnRanges {
+    private static final LongComparator OFFSET_COMPARATOR =
+            (x, y) -> Integer.compare(getOffsetFromPair(x), 
getOffsetFromPair(y));
+    private final int numberOfPrimaryKeys;
+
+    // For eviction
+    private final BitSet nonEvictablePages;
+
+    // For Query
+    private final BitSet evictablePages;
+    private final BitSet cloudOnlyPages;
+
+    private ColumnBTreeReadLeafFrame leafFrame;
+    private long[] offsetColumnIndexPairs;
+    private int[] lengths;
+    private int[] columnsOrder;
+    private int pageZeroId;
+
+    public ColumnRanges(int numberOfPrimaryKeys) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+
+        offsetColumnIndexPairs = new long[0];
+        lengths = new int[0];
+        columnsOrder = new int[0];
+
+        nonEvictablePages = new BitSet();
+
+        evictablePages = new BitSet();
+        cloudOnlyPages = new BitSet();
+    }
+
+    /**
+     * @return number of primary keys
+     */
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    /**
+     * Reset ranges for initializing {@link ColumnSweepPlanner}
+     *
+     * @param leafFrame to compute the ranges for
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+        reset(leafFrame, EMPTY, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset column ranges for {@link ColumnSweeper}
+     *
+     * @param leafFrame to compute the ranges for
+     * @param plan      eviction plan
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+        reset(leafFrame, plan, EMPTY, EMPTY);
+    }
+
+    /**
+     * Reset ranges for {@link CloudColumnReadContext}
+     *
+     * @param leafFrame        to compute the ranges for
+     * @param requestedColumns required columns
+     * @param evictableColumns columns that are or will be evicted
+     * @param cloudOnlyColumns locked columns that cannot be read from a local 
disk
+     */
+    public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet 
requestedColumns, BitSet evictableColumns,
+            BitSet cloudOnlyColumns) {
+        // Set leafFrame
+        this.leafFrame = leafFrame;
+        // Ensure arrays capacities (given the leafFrame's columns and pages)
+        init();
+
+        // Get the number of columns in a page
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        for (int i = 0; i < numberOfColumns; i++) {
+            long offset = leafFrame.getColumnOffset(i);
+            // Set the first 32-bits to the offset and the second 32-bits to 
columnIndex
+            offsetColumnIndexPairs[i] = (offset << 32) + i;
+        }
+
+        // Set artificial offset to determine the last column's length
+        offsetColumnIndexPairs[numberOfColumns] = 
(leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+
+        // Sort the pairs by offset (i.e., lowest offset first)
+        LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns, 
OFFSET_COMPARATOR);
+
+        int columnOrdinal = 0;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = 
getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+            int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+            int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+
+            // Compute the column's length in bytes (set 0 for PKs)
+            int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset - 
offset;
+            lengths[columnIndex] = length;
+
+            // Get start page ID (given the computed length above)
+            int startPageId = getColumnStartPageIndex(columnIndex);
+            // Get the number of pages (given the computed length above)
+            int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+            if (columnIndex >= numberOfPrimaryKeys && 
requestedColumns.get(columnIndex)) {
+                // Set column index
+                columnsOrder[columnOrdinal++] = columnIndex;
+                // Compute cloud-only and evictable pages
+                setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns, 
evictableColumns, startPageId,
+                        numberOfPages);
+                // A requested column. Keep its pages as requested
+                continue;
+            }
+
+            // Mark the page as non-evictable
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                nonEvictablePages.set(j);
+            }
+        }
+
+        // Bound the nonRequestedPages to the number of pages in the mega leaf 
node
+        nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+        // to indicate the end
+        columnsOrder[columnOrdinal] = -1;
+    }
+
+    /**
+     * First page of a column
+     *
+     * @param columnIndex column index
+     * @return pageID
+     */
+    public int getColumnStartPageIndex(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex), 
pageSize);
+    }
+
+    /**
+     * Length of a column in pages
+     *
+     * @param columnIndex column index
+     * @return number of pages
+     */
+    public int getColumnNumberOfPages(int columnIndex) {
+        int pageSize = leafFrame.getBuffer().capacity();
+        int numberOfPages = getNumberOfPages(getColumnLength(columnIndex), 
pageSize);
+        return numberOfPages == 0 ? 1 : numberOfPages;
+    }
+
+    /**
+     * Length of a column in bytes
+     *
+     * @param columnIndex column index
+     * @return number of bytes
+     */
+    public int getColumnLength(int columnIndex) {
+        return lengths[columnIndex];
+    }
+
+    /**
+     * Returns true if the page is meant to be read from the cloud only
+     *
+     * @param pageId page ID
+     * @return true of the page should be read from the cloud, false otherwise
+     * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+     */
+    public boolean isCloudOnly(int pageId) {
+        // Compute the relative page ID for this mega leaf node
+        int relativePageId = pageId - pageZeroId;
+        return cloudOnlyPages.get(relativePageId);
+    }
+
+    /**
+     * Whether the page has been or will be evicted
+     *
+     * @param pageId page ID
+     * @return true of the page was or will be evicted, false otherwise
+     */
+    public boolean isEvictable(int pageId) {
+        int relativePageId = pageId - pageZeroId;
+        return evictablePages.get(relativePageId);
+    }
+
+    /**
+     * @return Bitset of all non-requested pages
+     */
+    public BitSet getNonEvictablePages() {
+        return nonEvictablePages;
+    }
+
+    /**
+     * @return you the order of columns that should be read in order to ensure 
(semi) sequential access.
+     * Sequential means page X is read before page Y, forall X and Y where X < 
Y
+     */
+    public int[] getColumnsOrder() {
+        return columnsOrder;
+    }
+
+    private void init() {
+        int numberOfColumns = leafFrame.getNumberOfColumns();
+        offsetColumnIndexPairs = 
LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
+        lengths = IntArrays.ensureCapacity(lengths, numberOfColumns, 0);
+        columnsOrder = IntArrays.ensureCapacity(columnsOrder, numberOfColumns 
+ 1, 0);
+        nonEvictablePages.clear();
+        evictablePages.clear();
+        cloudOnlyPages.clear();
+        pageZeroId = leafFrame.getPageId();
+    }
+
+    private static int getOffsetFromPair(long pair) {
+        return (int) (pair >> 32);
+    }
+
+    private static int getColumnIndexFromPair(long pair) {
+        return (int) pair;
+    }
+
+    private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet 
cloudOnlyColumns, BitSet evictableColumns,
+            int startPageId, int numberOfPages) {
+        if (evictableColumns == EMPTY && cloudOnlyColumns == EMPTY) {
+            return;
+        }
+
+        // Find pages that meant to be read from the cloud only or are 
evictable
+        boolean cloudOnly = cloudOnlyColumns.get(columnIndex);
+        boolean evictable = evictableColumns.get(columnIndex);
+        if (cloudOnly || evictable) {
+            for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+                if (cloudOnly) {
+                    cloudOnlyPages.set(j);
+                } else {
+                    evictablePages.set(j);
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        builder.append("       ");
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append(String.format("%02d", i));
+            builder.append("  ");
+        }
+
+        builder.append('\n');
+        for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+            builder.append(String.format("%03d", i));
+            builder.append(":");
+            int startPageId = getColumnStartPageIndex(i);
+            int columnPagesCount = getColumnNumberOfPages(i);
+            printColumnPages(builder, numberOfPages, startPageId, 
columnPagesCount);
+        }
+
+        builder.append("nonEvictablePages: ");
+        builder.append(nonEvictablePages);
+        builder.append('\n');
+        builder.append("evictablePages: ");
+        builder.append(evictablePages);
+        builder.append('\n');
+        builder.append("cloudOnlyPages: ");
+        builder.append(cloudOnlyPages);
+
+        return builder.toString();
+    }
+
+    private void printColumnPages(StringBuilder builder, int numberOfPages, 
int startPageId, int columnPagesCount) {
+        for (int i = 0; i < numberOfPages; i++) {
+            builder.append("   ");
+            if (i >= startPageId && i < startPageId + columnPagesCount) {
+                builder.append(1);
+            } else {
+                builder.append(0);
+            }
+        }
+        builder.append('\n');
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
new file mode 100644
index 0000000000..af69fe5f8d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static 
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+import org.apache.hyracks.control.nc.io.IOManager;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public final class CloudColumnReadContext implements IColumnReadContext {
+    private final ColumnProjectorType operation;
+    private final IPhysicalDrive drive;
+    private final BitSet plan;
+    private final BitSet cloudOnlyColumns;
+    private final ColumnRanges columnRanges;
+    private final CloudMegaPageReadContext columnCtx;
+    private final List<ICachedPage> pinnedPages;
+    private final BitSet projectedColumns;
+
+    public CloudColumnReadContext(IColumnProjectionInfo projectionInfo, 
IPhysicalDrive drive, BitSet plan) {
+        this.operation = projectionInfo.getProjectorType();
+        this.drive = drive;
+        this.plan = plan;
+        columnRanges = new 
ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
+        cloudOnlyColumns = new BitSet();
+        columnCtx = new CloudMegaPageReadContext(operation, columnRanges, 
drive);
+        pinnedPages = new ArrayList<>();
+        projectedColumns = new BitSet();
+        if (operation == QUERY || operation == MODIFY) {
+            for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); 
i++) {
+                int columnIndex = projectionInfo.getColumnIndex(i);
+                if (columnIndex >= 0) {
+                    projectedColumns.set(columnIndex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        ISweepLockInfo lockTest = cloudPage.beforeRead();
+        if (lockTest.isLocked()) {
+            ColumnSweepLockInfo lockedColumns = (ColumnSweepLockInfo) lockTest;
+            lockedColumns.getLockedColumns(cloudOnlyColumns);
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        CloudCachedPage cloudPage = (CloudCachedPage) page;
+        cloudPage.afterRead();
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle 
fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Page zero will be persisted (always) if free space permits
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, 
drive.hasSpace());
+    }
+
+    @Override
+    public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame, 
IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        // TODO do we support prefetching?
+        ICachedPage nextPage = 
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, 
leafFrame.getNextLeaf()), this);
+        release(bufferCache);
+        bufferCache.unpin(leafFrame.getPage(), this);
+        leafFrame.setPage(nextPage);
+        return nextPage;
+    }
+
+    @Override
+    public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame, 
IBufferCache bufferCache, int fileId)
+            throws HyracksDataException {
+        if (leafFrame.getTupleCount() == 0) {
+            return;
+        }
+
+        // TODO handle prefetch if supported
+
+        columnRanges.reset(leafFrame, projectedColumns, plan, 
cloudOnlyColumns);
+        int pageZeroId = leafFrame.getPageId();
+        int[] columnsOrders = columnRanges.getColumnsOrder();
+        int i = 0;
+        int columnIndex = columnsOrders[i];
+        while (columnIndex > -1) {
+            if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
+                columnIndex = columnsOrders[++i];
+                continue;
+            }
+
+            int startPageId = 
columnRanges.getColumnStartPageIndex(columnIndex);
+            // Will increment the number pages if the next column's pages are 
contiguous to this column's pages
+            int numberOfPages = 
columnRanges.getColumnNumberOfPages(columnIndex);
+
+            // Advance to the next column to check if it has contiguous pages
+            columnIndex = columnsOrders[++i];
+            while (columnIndex > -1) {
+                // Get the next column's start page ID
+                int nextStartPageId = 
columnRanges.getColumnStartPageIndex(columnIndex);
+                if (nextStartPageId > startPageId + numberOfPages + 1) {
+                    // The next startPageId is not contiguous, stop.
+                    break;
+                }
+
+                // Last page of this column
+                int nextLastPage = nextStartPageId + 
columnRanges.getColumnNumberOfPages(columnIndex);
+                // The next column's pages are contiguous. Combine its ranges 
with the previous one.
+                numberOfPages = nextLastPage - startPageId;
+                // Advance to the next column
+                columnIndex = columnsOrders[++i];
+            }
+
+            columnCtx.prepare(numberOfPages);
+            pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+        }
+    }
+
+    private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int 
start, int numOfRequestedPages)
+            throws HyracksDataException {
+        for (int i = start; i < start + numOfRequestedPages; i++) {
+            long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId + 
i);
+            pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+        }
+    }
+
+    @Override
+    public void release(IBufferCache bufferCache) throws HyracksDataException {
+        // Release might differ in the future if prefetching is supported
+        close(bufferCache);
+    }
+
+    @Override
+    public void close(IBufferCache bufferCache) throws HyracksDataException {
+        release(pinnedPages, bufferCache, columnCtx);
+        columnCtx.close();
+    }
+
+    private static void release(List<ICachedPage> pinnedPages, IBufferCache 
bufferCache, IBufferCacheReadContext ctx)
+            throws HyracksDataException {
+        for (int i = 0; i < pinnedPages.size(); i++) {
+            bufferCache.unpin(pinnedPages.get(i), ctx);
+        }
+        pinnedPages.clear();
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
new file mode 100644
index 0000000000..0f0b0b9506
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+import static 
org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+import static 
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnProjectorType operation;
+    private final ColumnRanges columnRanges;
+    private final IPhysicalDrive drive;
+    private int numberOfContiguousPages;
+    private int pageCounter;
+    private InputStream gapStream;
+
+    CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges 
columnRanges, IPhysicalDrive drive) {
+        this.operation = operation;
+        this.columnRanges = columnRanges;
+        this.drive = drive;
+    }
+
+    public void prepare(int numberOfContiguousPages) throws 
HyracksDataException {
+        close();
+        this.numberOfContiguousPages = numberOfContiguousPages;
+        pageCounter = 0;
+    }
+
+    @Override
+    public void onPin(ICachedPage page) throws HyracksDataException {
+        CloudCachedPage cachedPage = (CloudCachedPage) page;
+        if (gapStream != null && cachedPage.skipCloudStream()) {
+            /*
+             * This page is requested but the buffer cache has a valid copy in 
memory. Also, the page itself was
+             * requested to be read from the cloud. Since this page is valid, 
no buffer cache read() will be performed.
+             * As the buffer cache read() is also responsible for persisting 
the bytes read from the cloud, we can end
+             * up writing the bytes of this page in the position of another 
page. Therefore, we should skip the bytes
+             * for this particular page to avoid placing the bytes of this 
page into another page's position.
+             */
+            try {
+                long remaining = cachedPage.getCompressedPageSize();
+                while (remaining > 0) {
+                    remaining -= gapStream.skip(remaining);
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        return true;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle 
fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        boolean cloudOnly = columnRanges.isCloudOnly(pageId);
+        ByteBuffer buffer;
+        if (empty || cloudOnly || gapStream != null) {
+            boolean evictable = columnRanges.isEvictable(pageId);
+            /*
+             * Persist iff the following conditions are satisfied:
+             * - The page is empty
+             * - The page is not being evicted (cloudOnly)
+             * - The page is not planned for eviction (evictable)
+             * - The operation is not a merge operation (the component will be 
deleted anyway)
+             * - The disk has space
+             *
+             * Note: 'emtpy' can be false while 'cloudOnly is true'. We cannot 
read from disk as the page can be
+             * evicted at any moment. In other words, the sweeper told us that 
it is going to evict this page; hence
+             * 'cloudOnly' is true.
+             */
+            boolean persist = empty && !cloudOnly && !evictable && operation 
!= MERGE && drive.hasSpace();
+            buffer = readFromStream(ioManager, fileHandle, header, cPage, 
persist);
+            buffer.position(RESERVED_HEADER_BYTES);
+        } else {
+            /*
+             * Here we can find a page that is planned for eviction, but it 
has not being evicted yet
+             * (i.e., empty = false). This could happen if the cursor is at a 
point the sweeper hasn't
+             * reached yet (i.e., cloudOnly = false).
+             */
+            buffer = DEFAULT.processHeader(ioManager, fileHandle, header, 
cPage);
+        }
+
+        if (++pageCounter == numberOfContiguousPages) {
+            close();
+        }
+
+        return buffer;
+    }
+
+    void close() throws HyracksDataException {
+        if (gapStream != null) {
+            try {
+                gapStream.close();
+                gapStream = null;
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+
+    private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle 
fileHandle,
+            BufferCacheHeaderHelper header, CachedPage cPage, boolean persist) 
throws HyracksDataException {
+        InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+        ByteBuffer buffer = header.getBuffer();
+        buffer.position(0);
+        try {
+            while (buffer.remaining() != 0) {
+                int length = stream.read(buffer.array(), buffer.position(), 
buffer.remaining());
+                if (length < 0) {
+                    throw new IllegalStateException("Stream should not be 
empty!");
+                }
+                buffer.position(buffer.position() + length);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+        buffer.flip();
+
+        if (persist) {
+            long offset = cPage.getCompressedPageOffset();
+            ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+            BufferCacheCloudReadContextUtil.persist(cloudIOManager, 
fileHandle.getFileHandle(), buffer, offset);
+        }
+
+        return buffer;
+    }
+
+    private InputStream getOrCreateStream(IOManager ioManager, 
BufferedFileHandle fileHandle, CachedPage cPage)
+            throws HyracksDataException {
+        if (gapStream != null) {
+            return gapStream;
+        }
+
+        LOGGER.info("Cloud stream read for {} pages", numberOfContiguousPages 
- pageCounter);
+        int requiredNumOfPages = numberOfContiguousPages - pageCounter;
+        long offset = cPage.getCompressedPageOffset();
+        int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+        long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+
+        ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+        gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(), 
offset, length);
+
+        return gapStream;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
new file mode 100644
index 0000000000..dcde8336a0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import 
org.apache.hyracks.cloud.buffercache.context.DefaultCloudOnlyWriteContext;
+import org.apache.hyracks.control.nc.io.IOManager;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import 
org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public final class CloudColumnWriteContext implements IColumnWriteContext {
+    private static final int INITIAL_NUMBER_OF_COLUMNS = 32;
+    private final IPhysicalDrive drive;
+    private final ColumnSweepPlanner planner;
+    private final BitSet plan;
+    private final IntSet indexedColumns;
+    private int[] sizes;
+    private int numberOfColumns;
+    private IBufferCacheWriteContext currentContext;
+    /**
+     * writeAndSwap = true means the next call to write() will persist the 
page locally and swap to cloud-only
+     * writer (i.e., the following pages will be written in the cloud but not 
locally).
+     * <p>
+     * 'writeAndSwap' is set to 'true' iff the previous column's last page 
should be persisted AND it is
+     * 'overlapping' with this column's first page.
+     */
+    private boolean writeLocallyAndSwitchToCloudOnly;
+
+    public CloudColumnWriteContext(IPhysicalDrive drive, ColumnSweepPlanner 
planner, int numberOfColumns) {
+        this.drive = drive;
+        this.planner = planner;
+        this.plan = planner.getPlanCopy();
+        this.indexedColumns = planner.getIndexedColumnsCopy();
+        int initialLength =
+                planner.getNumberOfPrimaryKeys() == numberOfColumns ? 
INITIAL_NUMBER_OF_COLUMNS : numberOfColumns;
+        sizes = new int[initialLength];
+        // Number of columns is not known during the flush operation
+        this.numberOfColumns = 0;
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+    }
+
+    @Override
+    public void startWritingColumn(int columnIndex, boolean overlapping) {
+        if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
+            // The current column should be persisted locally if free disk 
space permits
+            currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        } else if (plan.get(columnIndex)) {
+            // This column was planned for eviction, do not persist.
+            if (overlapping && currentContext == 
DefaultBufferCacheWriteContext.INSTANCE) {
+                // The previous column's last page should be persisted AND it 
is overlapping with the current column's
+                // first page. Persist the first page locally and switch to 
cloud-only writer.
+                writeLocallyAndSwitchToCloudOnly = true;
+            } else {
+                // The previous column's last page is not overlapping. Switch 
to cloud-only writer (if not already)
+                currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            }
+        } else {
+            // Local drive is pressured. Write to cloud only.
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+        }
+    }
+
+    @Override
+    public void endWritingColumn(int columnIndex, int size) {
+        ensureCapacity(columnIndex);
+        sizes[columnIndex] = Math.max(sizes[columnIndex], size);
+    }
+
+    @Override
+    public void columnsPersisted() {
+        // Set the default writer context to persist pageZero and the interior 
nodes' pages locally
+        currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+        writeLocallyAndSwitchToCloudOnly = false;
+    }
+
+    @Override
+    public void close() {
+        // Report the sizes of the written columns
+        planner.adjustColumnSizes(sizes, numberOfColumns);
+    }
+
+    /*
+     * 
************************************************************************************************
+     * WRITE methods
+     * 
************************************************************************************************
+     */
+
+    @Override
+    public int write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer data)
+            throws HyracksDataException {
+        int writtenBytes = currentContext.write(ioManager, handle, offset, 
data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    @Override
+    public long write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer[] data)
+            throws HyracksDataException {
+        long writtenBytes = currentContext.write(ioManager, handle, offset, 
data);
+        switchIfNeeded();
+        return writtenBytes;
+    }
+
+    /*
+     * 
************************************************************************************************
+     * helper methods
+     * 
************************************************************************************************
+     */
+
+    private void ensureCapacity(int columnIndex) {
+        int length = sizes.length;
+        if (columnIndex >= length) {
+            sizes = IntArrays.grow(sizes, columnIndex + 1);
+        }
+
+        numberOfColumns = Math.max(numberOfColumns, columnIndex + 1);
+    }
+
+    private void switchIfNeeded() {
+        if (writeLocallyAndSwitchToCloudOnly) {
+            // Switch to cloud-only writer
+            currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+            writeLocallyAndSwitchToCloudOnly = false;
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
new file mode 100644
index 0000000000..ed83e84531
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+
+public final class ColumnSweepLockInfo implements ISweepLockInfo {
+    private final BitSet lockedColumns;
+
+    public ColumnSweepLockInfo() {
+        lockedColumns = new BitSet();
+    }
+
+    /**
+     * Reset the lock with plan's columns
+     *
+     * @param plan contains the columns to be locked
+     */
+    void reset(BitSet plan) {
+        lockedColumns.clear();
+        lockedColumns.or(plan);
+    }
+
+    /**
+     * Clear and set the locked columns in the provided {@link BitSet}
+     *
+     * @param lockedColumns used to get the locked columns
+     */
+    public void getLockedColumns(BitSet lockedColumns) {
+        lockedColumns.clear();
+        lockedColumns.or(this.lockedColumns);
+    }
+
+    @Override
+    public boolean isLocked() {
+        return true;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
new file mode 100644
index 0000000000..9549f4f7c6
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static 
org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+
+public final class ColumnSweepPlanner {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final double SIZE_WEIGHT = 0.3d;
+    private static final double LAST_ACCESS_WEIGHT = 1.0d - SIZE_WEIGHT;
+    private static final double INITIAL_PUNCHABLE_THRESHOLD = 0.7d;
+    private static final double PUNCHABLE_THRESHOLD_DECREMENT = 0.7d;
+    private static final int MAX_ITERATION_COUNT = 5;
+    private static final int REEVALUATE_PLAN_THRESHOLD = 50;
+
+    private final AtomicBoolean active;
+    private final int numberOfPrimaryKeys;
+    private final BitSet plan;
+    private final BitSet reevaluatedPlan;
+    private final IntSet indexedColumns;
+    private final ISweepClock clock;
+    private int numberOfColumns;
+    private long lastAccess;
+    private int maxSize;
+    private int[] sizes;
+    private long[] lastAccesses;
+
+    private double punchableThreshold;
+    private long lastSweepTs;
+    private int numberOfSweptColumns;
+    private int numberOfCloudRequests;
+
+    public ColumnSweepPlanner(int numberOfPrimaryKeys, ISweepClock clock) {
+        this.clock = clock;
+        active = new AtomicBoolean(false);
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        sizes = new int[0];
+        lastAccesses = new long[0];
+        indexedColumns = new IntOpenHashSet();
+        plan = new BitSet();
+        reevaluatedPlan = new BitSet();
+        punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+    }
+
+    public boolean isActive() {
+        return active.get();
+    }
+
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    public void onActivate(int numberOfColumns, List<ILSMDiskComponent> 
diskComponents,
+            IColumnTupleProjector sweepProjector, IBufferCache bufferCache) 
throws HyracksDataException {
+        resizeStatsArrays(numberOfColumns);
+        setInitialSizes(diskComponents, sweepProjector, bufferCache);
+        active.set(true);
+    }
+
+    public void setIndexedColumns(IColumnProjectionInfo projectionInfo) {
+        indexedColumns.clear();
+        for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++) 
{
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            indexedColumns.add(columnIndex);
+        }
+    }
+
+    public IntSet getIndexedColumnsCopy() {
+        return new IntOpenHashSet(indexedColumns);
+    }
+
+    public synchronized void access(IColumnProjectionInfo projectionInfo, 
boolean hasSpace) {
+        resetPlanIfNeeded(hasSpace);
+        long accessTime = clock.getCurrentTime();
+        lastAccess = accessTime;
+        int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
+        boolean requireCloudAccess = false;
+        for (int i = 0; i < numberOfColumns; i++) {
+            int columnIndex = projectionInfo.getColumnIndex(i);
+            if (columnIndex >= 0) {
+                // columnIndex can be -1 when accessing a non-existing column 
(i.e., not known by the schema)
+                lastAccesses[columnIndex] = accessTime;
+                requireCloudAccess |= numberOfSweptColumns > 0 && 
plan.get(columnIndex);
+            }
+        }
+
+        numberOfCloudRequests += requireCloudAccess ? 1 : 0;
+    }
+
+    public synchronized void adjustColumnSizes(int[] newSizes, int 
numberOfColumns) {
+        resizeStatsArrays(numberOfColumns);
+        for (int i = 0; i < numberOfColumns; i++) {
+            int newSize = newSizes[i];
+            sizes[i] = Math.max(sizes[i], newSize);
+            maxSize = Math.max(maxSize, newSize);
+        }
+    }
+
+    public synchronized boolean plan() {
+        plan.clear();
+        int numberOfEvictableColumns = 0;
+        int iter = 0;
+        // Calculate weights: Ensure the plan contains new columns that never 
been swept
+        while (iter < MAX_ITERATION_COUNT && numberOfEvictableColumns < 
numberOfColumns) {
+            if (numberOfEvictableColumns > 0) {
+                // Do not reiterate if we found columns to evict
+                break;
+            }
+
+            // Find evictable columns
+            numberOfEvictableColumns += findEvictableColumns(plan);
+
+            // The next iteration/plan will be more aggressive
+            punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
+            iter++;
+        }
+        // Register the plan time
+        lastSweepTs = clock.getCurrentTime();
+        // Add the number of evictable columns
+        numberOfSweptColumns += numberOfEvictableColumns;
+        if (numberOfEvictableColumns > 0) {
+            LOGGER.info("Planning to evict {} columns. The evictable columns 
are {}", numberOfEvictableColumns, plan);
+            return true;
+        }
+
+        LOGGER.info("Couldn't find columns to evict after {} iteration", iter);
+        return false;
+    }
+
+    public synchronized BitSet getPlanCopy() {
+        return (BitSet) plan.clone();
+    }
+
+    private double getWeight(int i, IntSet indexedColumns, int 
numberOfPrimaryKeys) {
+        if (i < numberOfPrimaryKeys || indexedColumns.contains(i)) {
+            return -1.0;
+        }
+
+        double sizeWeight = sizes[i] / (double) maxSize * SIZE_WEIGHT;
+        double lasAccessWeight = (lastAccess - lastAccesses[i]) / (double) 
lastAccess * LAST_ACCESS_WEIGHT;
+
+        return sizeWeight + lasAccessWeight;
+    }
+
+    private void resizeStatsArrays(int numberOfColumns) {
+        sizes = IntArrays.ensureCapacity(sizes, numberOfColumns);
+        lastAccesses = LongArrays.ensureCapacity(lastAccesses, 
numberOfColumns);
+        this.numberOfColumns = numberOfColumns - numberOfPrimaryKeys;
+    }
+
+    private void setInitialSizes(List<ILSMDiskComponent> diskComponents, 
IColumnTupleProjector sweepProjector,
+            IBufferCache bufferCache) throws HyracksDataException {
+        // This runs when activating an index (no need to synchronize on the 
opTracker)
+        if (diskComponents.isEmpty()) {
+            return;
+        }
+
+        IColumnProjectionInfo columnProjectionInfo =
+                ColumnSweeperUtil.createColumnProjectionInfo(diskComponents, 
sweepProjector);
+        ILSMDiskComponent latestComponent = diskComponents.get(0);
+        ILSMDiskComponent oldestComponent = 
diskComponents.get(diskComponents.size() - 1);
+        ColumnBTreeReadLeafFrame leafFrame = 
ColumnSweeperUtil.createLeafFrame(columnProjectionInfo, latestComponent);
+        ColumnRanges ranges = new 
ColumnRanges(columnProjectionInfo.getNumberOfPrimaryKeys());
+        // Get the column sizes from the freshest component, which has the 
columns of the most recent schema
+        setColumnSizes(latestComponent, leafFrame, ranges, bufferCache);
+        if (isMergedComponent(oldestComponent.getId())) {
+            // Get the column sizes from the oldest merged component, which 
probably has the largest columns
+            setColumnSizes(oldestComponent, leafFrame, ranges, bufferCache);
+        }
+    }
+
+    private void setColumnSizes(ILSMDiskComponent diskComponent, 
ColumnBTreeReadLeafFrame leafFrame,
+            ColumnRanges ranges, IBufferCache bufferCache) throws 
HyracksDataException {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        long dpid = BufferedFileHandle.getDiskPageId(columnBTree.getFileId(), 
columnBTree.getBulkloadLeafStart());
+        ICachedPage page = bufferCache.pin(dpid);
+        try {
+            leafFrame.setPage(page);
+            ranges.reset(leafFrame);
+            for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+                sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
+                maxSize = Math.max(maxSize, sizes[i]);
+            }
+        } finally {
+            bufferCache.unpin(page);
+        }
+    }
+
+    private int findEvictableColumns(BitSet plan) {
+        int numberOfEvictableColumns = 0;
+        for (int i = 0; i < sizes.length; i++) {
+            if (!plan.get(i) && getWeight(i, indexedColumns, 
numberOfPrimaryKeys) >= punchableThreshold) {
+                // Column reached a punchable threshold; include it in the 
eviction plan.
+                plan.set(i);
+                numberOfEvictableColumns++;
+            }
+        }
+
+        return numberOfEvictableColumns;
+    }
+
+    private void resetPlanIfNeeded(boolean hasSpace) {
+        if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+            return;
+        }
+
+        numberOfCloudRequests = 0;
+        reevaluatedPlan.clear();
+        int numberOfEvictableColumns = findEvictableColumns(reevaluatedPlan);
+        for (int i = 0; i < numberOfEvictableColumns; i++) {
+            int columnIndex = reevaluatedPlan.nextSetBit(i);
+            if (!plan.get(columnIndex)) {
+                // the plan contains a stale column. Invalidate!
+                plan.clear();
+                plan.or(reevaluatedPlan);
+                LOGGER.info("Re-planning to evict {} columns. The newly 
evictable columns are {}",
+                        numberOfEvictableColumns, plan);
+                break;
+            }
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
new file mode 100644
index 0000000000..4932bf0c7f
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState.READABLE_UNWRITABLE;
+import static 
org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class ColumnSweeper {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ColumnSweepLockInfo lockedColumns;
+    private final ColumnRanges ranges;
+    private final List<ILSMDiskComponent> sweepableComponents;
+
+    public ColumnSweeper(int numberOfPrimaryKeys) {
+        lockedColumns = new ColumnSweepLockInfo();
+        ranges = new ColumnRanges(numberOfPrimaryKeys);
+        sweepableComponents = new ArrayList<>();
+    }
+
+    public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector 
sweepProjector)
+            throws HyracksDataException {
+        IndexUnit indexUnit = context.getIndexUnit();
+        LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
+        IColumnProjectionInfo projectionInfo = 
captureSweepableComponents(lsmColumnBTree, sweepProjector);
+        if (projectionInfo == null) {
+            // no sweepable components
+            return 0L;
+        }
+
+        LOGGER.info("Sweeping {}", lsmColumnBTree);
+        ILSMDiskComponent latestComponent = sweepableComponents.get(0);
+        ColumnBTreeReadLeafFrame leafFrame = 
ColumnSweeperUtil.createLeafFrame(projectionInfo, latestComponent);
+        IBufferCacheReadContext bcOpCtx = SweepBufferCacheReadContext.INSTANCE;
+        lockedColumns.reset(plan);
+        long freedSpace = 0L;
+        for (int i = 0; i < sweepableComponents.size(); i++) {
+            if (context.stopSweeping()) {
+                // Exit as the index is being dropped
+                return 0L;
+            }
+
+            boolean failed = false;
+            // Components are entered one at a time to allow components to be 
merged and deactivated
+            ILSMDiskComponent diskComponent = enterAndGetComponent(i, 
lsmColumnBTree);
+            // If diskComponent is null, that means it is not a viable 
candidate anymore
+            if (diskComponent != null) {
+                try {
+                    freedSpace += sweepDiskComponent(leafFrame, diskComponent, 
plan, context, bcOpCtx);
+                } catch (Throwable e) {
+                    failed = true;
+                    throw e;
+                } finally {
+                    exitComponent(diskComponent, lsmColumnBTree, failed);
+                }
+            }
+        }
+
+        LOGGER.info("Swept {} components and freed {} from disk", 
sweepableComponents.size(),
+                StorageUtil.toHumanReadableSize(freedSpace));
+        return freedSpace;
+    }
+
+    @CriticalPath
+    private IColumnProjectionInfo captureSweepableComponents(LSMColumnBTree 
lsmColumnBTree,
+            IColumnTupleProjector sweepProjector) throws HyracksDataException {
+        ILSMOperationTracker opTracker = lsmColumnBTree.getOperationTracker();
+        sweepableComponents.clear();
+        synchronized (opTracker) {
+            List<ILSMDiskComponent> diskComponents = 
lsmColumnBTree.getDiskComponents();
+            for (int i = 0; i < diskComponents.size(); i++) {
+                ILSMDiskComponent diskComponent = diskComponents.get(i);
+                /*
+                 * Get components that are only in READABLE_UNWRITABLE state. 
Components that are currently being
+                 * merged should not be swept as they will be deleted anyway.
+                 * Also, only sweep merged components as flushed components 
are relatively smaller and should be
+                 * merged eventually. So, it is preferable to read everything 
locally when merging flushed components.
+                 * TODO should we sweep flushed components?
+                 */
+                if (isMergedComponent(diskComponent.getId()) && 
diskComponent.getState() == READABLE_UNWRITABLE
+                        && diskComponent.getComponentSize() > 0) {
+                    // The component is a good candidate to be swept
+                    sweepableComponents.add(diskComponent);
+                }
+            }
+
+            if (sweepableComponents.isEmpty()) {
+                // No sweepable components
+                return null;
+            }
+
+            return 
ColumnSweeperUtil.createColumnProjectionInfo(sweepableComponents, 
sweepProjector);
+        }
+    }
+
+    private ILSMDiskComponent enterAndGetComponent(int index, LSMColumnBTree 
lsmColumnBTree)
+            throws HyracksDataException {
+        ILSMDiskComponent diskComponent = sweepableComponents.get(index);
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            // Make sure the component is still in READABLE_UNWRITABLE state
+            if (diskComponent.getState() == READABLE_UNWRITABLE
+                    && 
diskComponent.threadEnter(LSMOperationType.DISK_COMPONENT_SCAN, false)) {
+                return diskComponent;
+            }
+        }
+
+        // the component is not a viable candidate anymore
+        return null;
+    }
+
+    private void exitComponent(ILSMDiskComponent diskComponent, LSMColumnBTree 
lsmColumnBTree, boolean failed)
+            throws HyracksDataException {
+        synchronized (lsmColumnBTree.getOperationTracker()) {
+            diskComponent.threadExit(LSMOperationType.DISK_COMPONENT_SCAN, 
failed, false);
+        }
+    }
+
+    private long sweepDiskComponent(ColumnBTreeReadLeafFrame leafFrame, 
ILSMDiskComponent diskComponent, BitSet plan,
+            SweepContext context, IBufferCacheReadContext bcOpCtx) throws 
HyracksDataException {
+        long dpid = getFirstPageId(diskComponent);
+        int fileId = BufferedFileHandle.getFileId(dpid);
+        int nextPageId = BufferedFileHandle.getPageId(dpid);
+        int freedSpace = 0;
+        context.open(fileId);
+        try {
+            while (nextPageId >= 0) {
+                if (context.stopSweeping()) {
+                    // Exit as the index is being dropped
+                    return 0L;
+                }
+                CloudCachedPage page0 = 
context.pin(BufferedFileHandle.getDiskPageId(fileId, nextPageId), bcOpCtx);
+                boolean columnsLocked = false;
+                try {
+                    leafFrame.setPage(page0);
+                    nextPageId = leafFrame.getNextLeaf();
+                    columnsLocked = page0.trySweepLock(lockedColumns);
+                    if (columnsLocked) {
+                        leafFrame.setPage(page0);
+                        ranges.reset(leafFrame, plan);
+                        freedSpace += punchHoles(context, leafFrame);
+                    }
+                } finally {
+                    if (columnsLocked) {
+                        page0.sweepUnlock();
+                    }
+                    context.unpin(page0, bcOpCtx);
+                }
+            }
+        } finally {
+            context.close();
+        }
+
+        return freedSpace;
+    }
+
+    private long getFirstPageId(ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        int fileId = columnBTree.getFileId();
+        int firstPage = columnBTree.getBulkloadLeafStart();
+        return BufferedFileHandle.getDiskPageId(fileId, firstPage);
+    }
+
+    private int punchHoles(SweepContext context, ColumnBTreeReadLeafFrame 
leafFrame) throws HyracksDataException {
+        int freedSpace = 0;
+        int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+        int pageZeroId = leafFrame.getPageId();
+
+        // Start from 1 as we do not evict pageZero
+        BitSet nonEvictablePages = ranges.getNonEvictablePages();
+        int start = nonEvictablePages.nextClearBit(1);
+        while (start < numberOfPages) {
+            int end = nonEvictablePages.nextSetBit(start);
+            int numberOfEvictablePages = end - start;
+            freedSpace += context.punchHole(pageZeroId + start, 
numberOfEvictablePages);
+
+            start = nonEvictablePages.nextClearBit(end);
+        }
+
+        return freedSpace;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
new file mode 100644
index 0000000000..e51f1d7493
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+public class ColumnSweeperUtil {
+    private ColumnSweeperUtil() {
+    }
+
+    public static final BitSet EMPTY = new BitSet();
+
+    static IColumnProjectionInfo 
createColumnProjectionInfo(List<ILSMDiskComponent> diskComponents,
+            IColumnTupleProjector projector) throws HyracksDataException {
+        // Get the newest disk component, which has the newest column metadata
+        ILSMDiskComponent newestComponent = diskComponents.get(0);
+        IValueReference columnMetadata = 
ColumnUtil.getColumnMetadataCopy(newestComponent.getMetadata());
+
+        return projector.createProjectionInfo(columnMetadata);
+    }
+
+    static ColumnBTreeReadLeafFrame createLeafFrame(IColumnProjectionInfo 
projectionInfo,
+            ILSMDiskComponent diskComponent) {
+        ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+        ColumnBTreeLeafFrameFactory leafFrameFactory = 
(ColumnBTreeLeafFrameFactory) columnBTree.getLeafFrameFactory();
+        return leafFrameFactory.createReadFrame(projectionInfo);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
new file mode 100644
index 0000000000..f36a51d149
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+@FunctionalInterface
+public interface ISweepClock {
+    long getCurrentTime();
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
new file mode 100644
index 0000000000..7b51d55520
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static 
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import 
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+final class SweepBufferCacheReadContext implements IBufferCacheReadContext {
+    static final IBufferCacheReadContext INSTANCE = new 
SweepBufferCacheReadContext();
+
+    private SweepBufferCacheReadContext() {
+    }
+
+    @Override
+    public void onPin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public void onUnpin(ICachedPage page) {
+        // NoOp
+    }
+
+    @Override
+    public boolean isNewPage() {
+        return false;
+    }
+
+    @Override
+    public boolean incrementStats() {
+        // Do not increment the stats for the sweeper
+        return false;
+    }
+
+    @Override
+    public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle 
fileHandle, BufferCacheHeaderHelper header,
+            CachedPage cPage) throws HyracksDataException {
+        // Will not persist as the disk is pressured
+        return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
new file mode 100644
index 0000000000..6bace98fef
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.util.StorageUtil.getIntSizeInBytes;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Random;
+
+import 
org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummyColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummySweepClock;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public class ColumnSweepPlannerTest {
+    private static final int MAX_MEGA_LEAF_NODE_SIZE = getIntSizeInBytes(10, 
StorageUtil.StorageUnit.MEGABYTE);
+    private static final Random RANDOM = new Random(0);
+    private final DummySweepClock clock = new DummySweepClock();
+
+    @Test
+    public void test10Columns() {
+        int numberOfPrimaryKeys = 1;
+        int numberOfColumns = numberOfPrimaryKeys + 10;
+        int[] columnSizes = createNormalColumnSizes(numberOfPrimaryKeys, 
numberOfColumns);
+        ColumnSweepPlanner planner = new 
ColumnSweepPlanner(numberOfPrimaryKeys, clock);
+        IntList projectedColumns = new IntArrayList();
+        DummyColumnProjectionInfo info = new 
DummyColumnProjectionInfo(numberOfPrimaryKeys, QUERY, projectedColumns);
+
+        // Adjust sizes
+        planner.adjustColumnSizes(columnSizes, numberOfColumns);
+
+        // Project 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, 
projectedColumns);
+        // access the projected columns (max 10 times)
+        access(planner, info, true, 10);
+
+        // Advance clock
+        clock.advance(10);
+
+        // Plan for eviction
+        BitSet keptColumns = new BitSet();
+        planner.plan();
+        computeKeptColumns(planner.getPlanCopy(), keptColumns, 
numberOfColumns);
+
+        // Project another 3 columns
+        projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3, 
projectedColumns);
+        // access the projected columns
+        access(planner, info, true, 100);
+
+        // At this point, the plan should change
+        BitSet newKeptColumns = new BitSet();
+        computeKeptColumns(planner.getPlanCopy(), newKeptColumns, 
numberOfColumns);
+
+        Assert.assertNotEquals(keptColumns, newKeptColumns);
+    }
+
+    private void computeKeptColumns(BitSet plan, BitSet keptColumns, int 
numberOfColumns) {
+        keptColumns.clear();
+        for (int i = 0; i < numberOfColumns; i++) {
+            if (!plan.get(i)) {
+                keptColumns.set(i);
+            }
+        }
+
+        System.out.println("Kept columns: " + keptColumns);
+    }
+
+    private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo 
info, boolean hasSpace, int bound) {
+        int numberOfAccesses = RANDOM.nextInt(1, bound);
+        for (int i = 0; i < numberOfAccesses; i++) {
+            planner.access(info, hasSpace);
+            clock.advance(1);
+        }
+
+        System.out.println("Accessed: " + info + " " + numberOfAccesses + " 
time");
+    }
+
+    private void projectedColumns(int numberPrimaryKeys, int numberOfColumns, 
int numberOfProjectedColumns,
+            IntList projectedColumns) {
+        IntSet alreadyProjectedColumns = new IntOpenHashSet();
+        projectedColumns.clear();
+        for (int i = 0; i < numberOfProjectedColumns; i++) {
+            int columnIndex = RANDOM.nextInt(numberPrimaryKeys, 
numberOfColumns);
+            while (alreadyProjectedColumns.contains(columnIndex)) {
+                columnIndex = RANDOM.nextInt(numberPrimaryKeys, 
numberOfColumns);
+            }
+            projectedColumns.add(columnIndex);
+            alreadyProjectedColumns.add(columnIndex);
+        }
+    }
+
+    private int[] createNormalColumnSizes(int numberOfPrimaryKeys, int 
numberOfColumns) {
+        int[] columnSizes = new int[numberOfColumns];
+        double[] normalDistribution = new double[numberOfColumns];
+        double sum = 0.0d;
+        for (int i = 0; i < numberOfColumns; i++) {
+            double value = Math.abs(RANDOM.nextGaussian());
+            normalDistribution[i] = value;
+            sum += value;
+        }
+
+        for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+            int size = (int) Math.round((normalDistribution[i] / sum) * 
MAX_MEGA_LEAF_NODE_SIZE);
+            columnSizes[i] = size;
+        }
+
+        System.out.println("Column sizes:");
+        for (int i = 0; i < numberOfColumns; i++) {
+            System.out.println(i + ": " + 
StorageUtil.toHumanReadableSize(columnSizes[i]));
+        }
+        System.out.println("TotalSize:" + 
StorageUtil.toHumanReadableSize(Arrays.stream(columnSizes).sum()));
+        return columnSizes;
+    }
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
new file mode 100644
index 0000000000..3b9d6ba68e
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import 
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class DummyColumnProjectionInfo implements IColumnProjectionInfo {
+    private final int numberOfPrimaryKeys;
+    private final ColumnProjectorType projectorType;
+    private final IntList projectedColumns;
+
+    public DummyColumnProjectionInfo(int numberOfPrimaryKeys, 
ColumnProjectorType projectorType,
+            IntList projectedColumns) {
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        this.projectorType = projectorType;
+        this.projectedColumns = projectedColumns;
+    }
+
+    @Override
+    public int getColumnIndex(int ordinal) {
+        return projectedColumns.getInt(ordinal);
+    }
+
+    @Override
+    public int getNumberOfProjectedColumns() {
+        return projectedColumns.size();
+    }
+
+    @Override
+    public int getNumberOfPrimaryKeys() {
+        return numberOfPrimaryKeys;
+    }
+
+    @Override
+    public int getFilteredColumnIndex(int ordinal) {
+        return -1;
+    }
+
+    @Override
+    public int getNumberOfFilteredColumns() {
+        return 0;
+    }
+
+    @Override
+    public ColumnProjectorType getProjectorType() {
+        return projectorType;
+    }
+
+    @Override
+    public String toString() {
+        return projectedColumns.toString();
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
new file mode 100644
index 0000000000..6f209ecbde
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ISweepClock;
+
+public class DummySweepClock implements ISweepClock {
+    long timestamp;
+
+    @Override
+    public long getCurrentTime() {
+        return timestamp;
+    }
+
+    public void advance(long delta) {
+        timestamp += delta;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 1779527fb5..98d2d73321 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -55,15 +55,17 @@ public class LSMComponentIdUtils {
     }
 
     public static void persist(ILSMComponentId id, IComponentMetadata 
metadata) throws HyracksDataException {
-        LSMComponentId componentId = (LSMComponentId) id;
-        metadata.put(COMPONENT_ID_MIN_KEY, 
LongPointable.FACTORY.createPointable(componentId.getMinId()));
-        metadata.put(COMPONENT_ID_MAX_KEY, 
LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+        metadata.put(COMPONENT_ID_MIN_KEY, 
LongPointable.FACTORY.createPointable(id.getMinId()));
+        metadata.put(COMPONENT_ID_MAX_KEY, 
LongPointable.FACTORY.createPointable(id.getMaxId()));
     }
 
     public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId 
id2) {
-        long minId = Long.min(((LSMComponentId) id1).getMinId(), 
((LSMComponentId) id2).getMinId());
-        long maxId = Long.max(((LSMComponentId) id1).getMaxId(), 
((LSMComponentId) id2).getMaxId());
+        long minId = Long.min(id1.getMinId(), id2.getMinId());
+        long maxId = Long.max(id1.getMaxId(), id2.getMaxId());
         return new LSMComponentId(minId, maxId);
     }
 
+    public static boolean isMergedComponent(ILSMComponentId id) {
+        return id.getMinId() < id.getMaxId();
+    }
 }

Reply via email to