This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 746e3f5fea [ASTERIXDB-3389][STO] Support caching/eviciting columns
746e3f5fea is described below
commit 746e3f5fea806db0bf219a2ba56554b1a2797bd4
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed May 8 21:58:37 2024 -0700
[ASTERIXDB-3389][STO] Support caching/eviciting columns
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Prepare columnar indexes to support caching and
evicting columns.
Change-Id: Ib557608b0b25219ffc00a76325091a92f5e77698
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18260
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../AbstractColumnImmutableReadMetadata.java | 12 +-
.../operation/lsm/flush/BatchFinalizerVisitor.java | 5 +-
.../flush/FlushColumnTupleReaderWriterFactory.java | 4 +-
.../lsm/flush/FlushColumnTupleWithMetaWriter.java | 5 +-
.../lsm/flush/FlushColumnTupleWriter.java | 6 +-
.../load/LoadColumnTupleReaderWriterFactory.java | 2 +-
.../operation/lsm/load/LoadColumnTupleWriter.java | 5 +-
.../lsm/merge/MergeColumnReadMetadata.java | 4 +-
.../merge/MergeColumnTupleReaderWriterFactory.java | 3 +-
.../lsm/merge/MergeColumnTupleWriter.java | 12 +-
.../create/PrimaryScanColumnTupleProjector.java | 4 +-
.../upsert/UpsertPreviousColumnTupleProjector.java | 4 +-
.../operation/query/QueryColumnMetadata.java | 11 +-
.../operation/query/QueryColumnTupleProjector.java | 8 +-
.../query/QueryColumnTupleProjectorFactory.java | 4 +-
.../query/QueryColumnWithMetaMetadata.java | 13 +-
.../query/QueryColumnWithMetaTupleProjector.java | 6 +-
.../asterix/column/values/IColumnBatchWriter.java | 10 +-
.../column/values/writer/ColumnBatchWriter.java | 41 ++-
.../column/test/bytes/AbstractBytesTest.java | 3 +-
.../asterix/column/test/bytes/FlushLargeTest.java | 13 +-
.../asterix/column/test/bytes/FlushSmallTest.java | 13 +-
.../asterix/column/test/dummy/AssemblerTest.java | 3 +-
.../values/writer/NoOpColumnBatchWriter.java | 11 +-
.../hyracks-storage-am-lsm-btree-column/pom.xml | 10 +
...rojectionInfo.java => ColumnProjectorType.java} | 34 +--
.../api/projection/IColumnProjectionInfo.java | 5 +
.../cloud/CloudColumnIndexDiskCacheManager.java | 103 +++++++
.../am/lsm/btree/column/cloud/ColumnRanges.java | 316 +++++++++++++++++++++
.../buffercache/read/CloudColumnReadContext.java | 197 +++++++++++++
.../buffercache/read/CloudMegaPageReadContext.java | 202 +++++++++++++
.../buffercache/write/CloudColumnWriteContext.java | 152 ++++++++++
.../column/cloud/sweep/ColumnSweepLockInfo.java | 56 ++++
.../column/cloud/sweep/ColumnSweepPlanner.java | 260 +++++++++++++++++
.../btree/column/cloud/sweep/ColumnSweeper.java | 220 ++++++++++++++
.../column/cloud/sweep/ColumnSweeperUtil.java | 55 ++++
.../lsm/btree/column/cloud/sweep/ISweepClock.java | 24 ++
.../cloud/sweep/SweepBufferCacheReadContext.java | 68 +++++
.../column/cloud/sweep/ColumnSweepPlannerTest.java | 139 +++++++++
.../column/dummy/DummyColumnProjectionInfo.java | 72 +++++
.../am/lsm/btree/column/dummy/DummySweepClock.java | 34 +++
.../am/lsm/common/util/LSMComponentIdUtils.java | 12 +-
42 files changed, 2059 insertions(+), 102 deletions(-)
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
index 5ac38d7f19..b7c40bd18b 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/metadata/AbstractColumnImmutableReadMetadata.java
@@ -21,17 +21,27 @@ package org.apache.asterix.column.metadata;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.data.std.api.IValueReference;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
public abstract class AbstractColumnImmutableReadMetadata extends
AbstractColumnImmutableMetadata
implements IColumnProjectionInfo {
+ private final ColumnProjectorType projectorType;
+
protected AbstractColumnImmutableReadMetadata(ARecordType datasetType,
ARecordType metaType,
- int numberOfPrimaryKeys, IValueReference serializedMetadata, int
numberOfColumns) {
+ int numberOfPrimaryKeys, IValueReference serializedMetadata, int
numberOfColumns,
+ ColumnProjectorType projectorType) {
super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata,
numberOfColumns);
+ this.projectorType = projectorType;
}
/**
* @return the corresponding reader (merge reader or query reader) given
<code>this</code> metadata
*/
public abstract AbstractColumnTupleReader createTupleReader();
+
+ @Override
+ public final ColumnProjectorType getProjectorType() {
+ return projectorType;
+ }
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
index 4cbe09bc9c..951a9fe7e6 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -59,9 +59,8 @@ public final class BatchFinalizerVisitor implements
ISchemaNodeVisitor<Void, Abs
columnMetadata.getMetaRoot().accept(this, null);
}
- int allocatedSpace =
batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
- allocatedSpace += batchWriter.writeColumns(orderedColumns);
- return allocatedSpace;
+ batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
+ return batchWriter.writeColumns(orderedColumns);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
index f597e4f694..c64b074e2e 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -41,10 +41,10 @@ public class FlushColumnTupleReaderWriterFactory extends
AbstractColumnTupleRead
if (flushColumnMetadata.getMetaType() == null) {
//no meta
return new FlushColumnTupleWriter(flushColumnMetadata, pageSize,
maxNumberOfTuples, tolerance,
- maxLeafNodeSize);
+ maxLeafNodeSize, writeContext);
}
return new FlushColumnTupleWithMetaWriter(flushColumnMetadata,
pageSize, maxNumberOfTuples, tolerance,
- maxLeafNodeSize);
+ maxLeafNodeSize, writeContext);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
index b51b3953f7..a1abf5e8a8 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -21,6 +21,7 @@ package org.apache.asterix.column.operation.lsm.flush;
import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter {
@@ -28,8 +29,8 @@ public class FlushColumnTupleWithMetaWriter extends
FlushColumnTupleWriter {
private final RecordLazyVisitablePointable metaPointable;
public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata,
int pageSize, int maxNumberOfTuples,
- double tolerance, int maxLeafNodeSize) {
- super(columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize);
+ double tolerance, int maxLeafNodeSize, IColumnWriteContext
writeContext) {
+ super(columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize, writeContext);
metaColumnTransformer = new ColumnTransformer(columnMetadata,
columnMetadata.getMetaRoot());
metaPointable = new
TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
index 41cad4932c..65f5eb44cf 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
@@ -45,11 +46,11 @@ public class FlushColumnTupleWriter extends
AbstractColumnTupleWriter {
protected int primaryKeysEstimatedSize;
public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int
pageSize, int maxNumberOfTuples,
- double tolerance, int maxLeafNodeSize) {
+ double tolerance, int maxLeafNodeSize, IColumnWriteContext
writeContext) {
this.columnMetadata = columnMetadata;
transformer = new ColumnTransformer(columnMetadata,
columnMetadata.getRoot());
finalizer = new BatchFinalizerVisitor(columnMetadata);
- writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(),
pageSize, tolerance);
+ writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(),
pageSize, tolerance, writeContext);
this.maxNumberOfTuples = maxNumberOfTuples;
this.maxLeafNodeSize = maxLeafNodeSize;
pointable = new
TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
@@ -105,6 +106,7 @@ public class FlushColumnTupleWriter extends
AbstractColumnTupleWriter {
@Override
public final void close() {
columnMetadata.close();
+ writer.close();
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
index 85569f90cc..531502efab 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -40,6 +40,6 @@ public class LoadColumnTupleReaderWriterFactory extends
FlushColumnTupleReaderWr
public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata
columnMetadata,
IColumnWriteContext writeContext) {
return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata,
pageSize, maxNumberOfTuples, tolerance,
- maxLeafNodeSize, MultiComparator.create(cmpFactories));
+ maxLeafNodeSize, MultiComparator.create(cmpFactories),
writeContext);
}
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index ca14000d45..0ba49cf9bf 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import
org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import org.apache.hyracks.storage.common.MultiComparator;
public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
@@ -32,8 +33,8 @@ public class LoadColumnTupleWriter extends
FlushColumnTupleWriter {
private final MultiComparator comparator;
public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int
pageSize, int maxNumberOfTuples,
- double tolerance, int maxLeafNodeSize, MultiComparator comparator)
{
- super(columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize);
+ double tolerance, int maxLeafNodeSize, MultiComparator comparator,
IColumnWriteContext writeContext) {
+ super(columnMetadata, pageSize, maxNumberOfTuples, tolerance,
maxLeafNodeSize, writeContext);
prevTupleKeys =
PointableTupleReference.create(columnMetadata.getNumberOfPrimaryKeys(),
ArrayBackedValueStorage::new);
this.comparator = comparator;
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
index a1eff69c4b..0152f5a908 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.operation.lsm.merge;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
@@ -43,7 +45,7 @@ public final class MergeColumnReadMetadata extends
AbstractColumnImmutableReadMe
private MergeColumnReadMetadata(ARecordType datasetType, ARecordType
metaType, int numberOfPrimaryKeys,
IColumnValuesReader[] columnReaders, IValueReference
serializedMetadata) {
- super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata,
columnReaders.length);
+ super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata,
columnReaders.length, MERGE);
this.columnReaders = columnReaders;
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
index d792855792..3f98c5fb62 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -38,7 +38,8 @@ public class MergeColumnTupleReaderWriterFactory extends
AbstractColumnTupleRead
public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata
columnMetadata,
IColumnWriteContext writeContext) {
MergeColumnWriteMetadata mergeWriteMetadata =
(MergeColumnWriteMetadata) columnMetadata;
- return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize,
maxNumberOfTuples, tolerance, maxLeafNodeSize);
+ return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize,
maxNumberOfTuples, tolerance, maxLeafNodeSize,
+ writeContext);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
index d3c102a6c4..5912a3bf8c 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -34,6 +34,7 @@ import
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
import
org.apache.hyracks.storage.am.lsm.btree.column.error.ColumnarValueException;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -52,7 +53,7 @@ public class MergeColumnTupleWriter extends
AbstractColumnTupleWriter {
private int numberOfAntiMatter;
public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int
pageSize, int maxNumberOfTuples,
- double tolerance, int maxLeafNodeSize) {
+ double tolerance, int maxLeafNodeSize, IColumnWriteContext
writeContext) {
this.columnMetadata = columnMetadata;
this.maxLeafNodeSize = maxLeafNodeSize;
List<IColumnTupleIterator> componentsTuplesList =
columnMetadata.getComponentsTuples();
@@ -68,7 +69,7 @@ public class MergeColumnTupleWriter extends
AbstractColumnTupleWriter {
}
this.maxNumberOfTuples = getMaxNumberOfTuples(maxNumberOfTuples,
totalNumberOfTuples, totalLength);
this.writtenComponents = new RunLengthIntArray();
- writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(),
pageSize, tolerance);
+ writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(),
pageSize, tolerance, writeContext);
writtenComponents.reset();
primaryKeyWriters = new
IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
for (int i = 0; i < primaryKeyWriters.length; i++) {
@@ -142,16 +143,17 @@ public class MergeColumnTupleWriter extends
AbstractColumnTupleWriter {
orderedColumns.add(columnMetadata.getWriter(i));
}
writer.setPageZeroBuffer(pageZero, numberOfColumns,
numberOfPrimaryKeys);
- int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters);
- allocatedSpace += writer.writeColumns(orderedColumns);
+ writer.writePrimaryKeyColumns(primaryKeyWriters);
+ int totalLength = writer.writeColumns(orderedColumns);
numberOfAntiMatter = 0;
- return allocatedSpace;
+ return totalLength;
}
@Override
public void close() {
columnMetadata.close();
+ writer.close();
}
private void writePrimaryKeys(MergeColumnTupleReference columnTuple)
throws HyracksDataException {
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
index c86318e1fc..57ad0ac6a4 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/create/PrimaryScanColumnTupleProjector.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.operation.lsm.secondary.create;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
@@ -40,7 +42,7 @@ final class PrimaryScanColumnTupleProjector implements
IColumnTupleProjector {
ARecordType requestedType) {
projector = new QueryColumnTupleProjector(datasetType,
numberOfPrimaryKeys, requestedType,
Collections.emptyMap(),
NoOpColumnFilterEvaluatorFactory.INSTANCE,
- NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null);
+ NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null, MODIFY);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
index 34b029186d..2b95c1a622 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/secondary/upsert/UpsertPreviousColumnTupleProjector.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.operation.lsm.secondary.upsert;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
@@ -42,7 +44,7 @@ final class UpsertPreviousColumnTupleProjector implements
IColumnTupleProjector
builder = new ArrayTupleBuilder(numberOfPrimaryKeys + 1);
projector = new QueryColumnTupleProjector(datasetType,
numberOfPrimaryKeys, requestedType,
Collections.emptyMap(),
NoOpColumnFilterEvaluatorFactory.INSTANCE,
- NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null);
+ NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null, MODIFY);
}
@Override
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index d50af3808c..a697e0d7e7 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -55,6 +55,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -78,9 +79,9 @@ public class QueryColumnMetadata extends
AbstractColumnImmutableReadMetadata {
FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
IColumnValuesReaderFactory readerFactory,
IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator
normalizedFilterEvaluator,
List<IColumnRangeFilterValueAccessor> filterValueAccessors,
- IColumnIterableFilterEvaluator columnFilterEvaluator,
List<IColumnValuesReader> filterColumnReaders)
- throws HyracksDataException {
- super(datasetType, metaType, primaryKeyReaders.length,
serializedMetadata, -1);
+ IColumnIterableFilterEvaluator columnFilterEvaluator,
List<IColumnValuesReader> filterColumnReaders,
+ ColumnProjectorType projectorType) throws HyracksDataException {
+ super(datasetType, metaType, primaryKeyReaders.length,
serializedMetadata, -1, projectorType);
this.fieldNamesDictionary = fieldNamesDictionary;
this.primaryKeyReaders = primaryKeyReaders;
this.normalizedFilterEvaluator = normalizedFilterEvaluator;
@@ -175,7 +176,7 @@ public class QueryColumnMetadata extends
AbstractColumnImmutableReadMetadata {
Map<String, FunctionCallInformation> functionCallInfoMap,
IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
IColumnIterableFilterEvaluatorFactory
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
- IHyracksTaskContext context) throws IOException {
+ IHyracksTaskContext context, ColumnProjectorType projectorType)
throws IOException {
byte[] bytes = serializedMetadata.getByteArray();
int offset = serializedMetadata.getStartOffset();
int length = serializedMetadata.getLength();
@@ -230,7 +231,7 @@ public class QueryColumnMetadata extends
AbstractColumnImmutableReadMetadata {
return new QueryColumnMetadata(datasetType, null, primaryKeyReaders,
serializedMetadata, fieldNamesDictionary,
clippedRoot, readerFactory, valueGetterFactory,
normalizedFilterEvaluator, filterValueAccessors,
- columnFilterEvaluator, filterColumnReaders);
+ columnFilterEvaluator, filterColumnReaders, projectorType);
}
protected static ObjectSchemaNode clip(ARecordType requestedType,
ObjectSchemaNode root,
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
index 369a891aa7..d17cfbd241 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjector.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
@@ -47,6 +48,7 @@ public class QueryColumnTupleProjector implements
IColumnTupleProjector {
protected final Map<String, FunctionCallInformation> functionCallInfoMap;
protected final IWarningCollector warningCollector;
protected final IHyracksTaskContext context;
+ protected final ColumnProjectorType projectorType;
protected final IColumnRangeFilterEvaluatorFactory
normalizedFilterEvaluatorFactory;
protected final IColumnIterableFilterEvaluatorFactory
columnFilterEvaluatorFactory;
private final AssembledTupleReference assembledTupleReference;
@@ -55,7 +57,7 @@ public class QueryColumnTupleProjector implements
IColumnTupleProjector {
Map<String, FunctionCallInformation> functionCallInfoMap,
IColumnRangeFilterEvaluatorFactory
normalizedFilterEvaluatorFactory,
IColumnIterableFilterEvaluatorFactory
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
- IHyracksTaskContext context) {
+ IHyracksTaskContext context, ColumnProjectorType projectorType) {
this.datasetType = datasetType;
this.numberOfPrimaryKeys = numberOfPrimaryKeys;
this.requestedType = requestedType;
@@ -64,6 +66,7 @@ public class QueryColumnTupleProjector implements
IColumnTupleProjector {
this.columnFilterEvaluatorFactory = columnFilterEvaluatorFactory;
this.warningCollector = warningCollector;
this.context = context;
+ this.projectorType = projectorType;
assembledTupleReference = new
AssembledTupleReference(getNumberOfTupleFields());
}
@@ -72,7 +75,8 @@ public class QueryColumnTupleProjector implements
IColumnTupleProjector {
try {
return QueryColumnMetadata.create(datasetType,
numberOfPrimaryKeys, serializedMetadata,
new ColumnValueReaderFactory(),
ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
- normalizedFilterEvaluatorFactory,
columnFilterEvaluatorFactory, warningCollector, context);
+ normalizedFilterEvaluatorFactory,
columnFilterEvaluatorFactory, warningCollector, context,
+ projectorType);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
index 0265a050cd..e333405e77 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnTupleProjectorFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.operation.query;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
import java.util.Map;
import
org.apache.asterix.column.filter.iterable.IColumnIterableFilterEvaluatorFactory;
@@ -61,7 +63,7 @@ public class QueryColumnTupleProjectorFactory implements
ITupleProjectorFactory
if (requestedMetaType == null) {
// The dataset does not contain a meta part
return new QueryColumnTupleProjector(datasetType,
numberOfPrimaryKeys, requestedType, functionCallInfo,
- rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory,
warningCollector, context);
+ rangeFilterEvaluatorFactory, columnFilterEvaluatorFactory,
warningCollector, context, QUERY);
}
// The dataset has a meta part
return new QueryColumnWithMetaTupleProjector(datasetType, metaType,
numberOfPrimaryKeys, requestedType,
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 34e40ffa3d..8b23c059b0 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -51,6 +51,7 @@ import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
/**
* Query column metadata (with metaRecord)
@@ -63,10 +64,11 @@ public final class QueryColumnWithMetaMetadata extends
QueryColumnMetadata {
FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root,
ObjectSchemaNode metaRoot,
IColumnValuesReaderFactory readerFactory, IValueGetterFactory
valueGetterFactory,
IColumnFilterEvaluator filterEvaluator,
List<IColumnRangeFilterValueAccessor> filterValueAccessors,
- IColumnIterableFilterEvaluator columnFilterEvaluator,
List<IColumnValuesReader> filterColumnReaders)
- throws HyracksDataException {
+ IColumnIterableFilterEvaluator columnFilterEvaluator,
List<IColumnValuesReader> filterColumnReaders,
+ ColumnProjectorType projectorType) throws HyracksDataException {
super(datasetType, metaType, primaryKeyReaders, serializedMetadata,
fieldNamesDictionary, root, readerFactory,
- valueGetterFactory, filterEvaluator, filterValueAccessors,
columnFilterEvaluator, filterColumnReaders);
+ valueGetterFactory, filterEvaluator, filterValueAccessors,
columnFilterEvaluator, filterColumnReaders,
+ projectorType);
metaAssembler = new ColumnAssembler(metaRoot, metaType, this,
readerFactory, valueGetterFactory);
}
@@ -121,7 +123,7 @@ public final class QueryColumnWithMetaMetadata extends
QueryColumnMetadata {
Map<String, FunctionCallInformation> functionCallInfo, ARecordType
metaRequestedType,
IColumnRangeFilterEvaluatorFactory normalizedEvaluatorFactory,
IColumnIterableFilterEvaluatorFactory
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
- IHyracksTaskContext context) throws IOException {
+ IHyracksTaskContext context, ColumnProjectorType projectorType)
throws IOException {
byte[] bytes = serializedMetadata.getByteArray();
int offset = serializedMetadata.getStartOffset();
int length = serializedMetadata.getLength();
@@ -179,6 +181,7 @@ public final class QueryColumnWithMetaMetadata extends
QueryColumnMetadata {
return new QueryColumnWithMetaMetadata(datasetType, metaType,
primaryKeyReaders, serializedMetadata,
fieldNamesDictionary, clippedRoot, metaClippedRoot,
readerFactory, valueGetterFactory,
- normalizedFilterEvaluator, filterValueAccessors,
columnFilterEvaluator, filterColumnReaders);
+ normalizedFilterEvaluator, filterValueAccessors,
columnFilterEvaluator, filterColumnReaders,
+ projectorType);
}
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
index f31c7ccd54..74524fe55a 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaTupleProjector.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.column.operation.query;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
@@ -47,7 +49,7 @@ public class QueryColumnWithMetaTupleProjector extends
QueryColumnTupleProjector
IColumnIterableFilterEvaluatorFactory
columnFilterEvaluatorFactory, IWarningCollector warningCollector,
IHyracksTaskContext context) {
super(datasetType, numberOfPrimaryKeys, requestedType,
functionCallInfoMap, filterEvaluator,
- columnFilterEvaluatorFactory, warningCollector, context);
+ columnFilterEvaluatorFactory, warningCollector, context,
QUERY);
this.metaType = metaType;
this.requestedMetaType = requestedMetaType;
}
@@ -58,7 +60,7 @@ public class QueryColumnWithMetaTupleProjector extends
QueryColumnTupleProjector
return QueryColumnWithMetaMetadata.create(datasetType, metaType,
numberOfPrimaryKeys, serializedMetadata,
new ColumnValueReaderFactory(),
ValueGetterFactory.INSTANCE, requestedType, functionCallInfoMap,
requestedMetaType, normalizedFilterEvaluatorFactory,
columnFilterEvaluatorFactory, warningCollector,
- context);
+ context, projectorType);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
index fc1173fcec..063743d8ae 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -30,15 +30,19 @@ public interface IColumnBatchWriter {
* Writes the primary keys' values to Page0
*
* @param primaryKeyWriters primary keys' writers
- * @return the allocated space for the primary keys' writers
*/
- int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws
HyracksDataException;
+ void writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters)
throws HyracksDataException;
/**
* Writes the non-key values to multiple pages
*
* @param nonKeysColumnWriters non-key values' writers
- * @return the allocated space for the non-key values' writers
+ * @return length of all columns (that includes pageZero)
*/
int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters)
throws HyracksDataException;
+
+ /**
+ * Close the writer
+ */
+ void close();
}
diff --git
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
index 6fbdc27016..1d8f6847a4 100644
---
a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
+++
b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -31,6 +31,7 @@ import org.apache.asterix.column.values.IColumnValuesWriter;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
/**
* A writer for a batch columns' values
@@ -40,17 +41,19 @@ public final class ColumnBatchWriter implements
IColumnBatchWriter {
private final MultiPersistentBufferBytesOutputStream columns;
private final int pageSize;
private final double tolerance;
+ private final IColumnWriteContext writeContext;
private final IReservedPointer columnLengthPointer;
-
private ByteBuffer pageZero;
private int columnsOffset;
private int filtersOffset;
private int primaryKeysOffset;
private int nonKeyColumnStartOffset;
- public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
int pageSize, double tolerance) {
+ public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
int pageSize, double tolerance,
+ IColumnWriteContext writeContext) {
this.pageSize = pageSize;
this.tolerance = tolerance;
+ this.writeContext = writeContext;
primaryKeys = new ByteBufferOutputStream();
columns = new MultiPersistentBufferBytesOutputStream(multiPageOpRef);
columnLengthPointer = columns.createPointer();
@@ -74,39 +77,48 @@ public final class ColumnBatchWriter implements
IColumnBatchWriter {
}
@Override
- public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters)
throws HyracksDataException {
- int allocatedSpace = 0;
+ public void writePrimaryKeyColumns(IColumnValuesWriter[]
primaryKeyWriters) throws HyracksDataException {
for (int i = 0; i < primaryKeyWriters.length; i++) {
IColumnValuesWriter writer = primaryKeyWriters[i];
setColumnOffset(i, primaryKeysOffset + primaryKeys.size());
writer.flush(primaryKeys);
- allocatedSpace += writer.getAllocatedSpace();
}
- return allocatedSpace;
}
@Override
public int writeColumns(PriorityQueue<IColumnValuesWriter>
nonKeysColumnWriters) throws HyracksDataException {
- int allocatedSpace = 0;
columns.reset();
while (!nonKeysColumnWriters.isEmpty()) {
IColumnValuesWriter writer = nonKeysColumnWriters.poll();
writeColumn(writer);
- allocatedSpace += writer.getAllocatedSpace();
}
- return allocatedSpace;
+
+ // compute the final length
+ int totalLength = nonKeyColumnStartOffset + columns.size();
+ // reset to ensure the last buffer's position and limit are set
appropriately
+ columns.reset();
+ return totalLength;
+ }
+
+ @Override
+ public void close() {
+ writeContext.close();
}
private void writeColumn(IColumnValuesWriter writer) throws
HyracksDataException {
+ boolean overlapping = true;
if (!hasEnoughSpace(columns.getCurrentBufferPosition(), writer)) {
/*
* We reset the columns stream to write all pages and confiscate a
new buffer to minimize splitting
* the columns value into multiple pages.
*/
+ overlapping = false;
nonKeyColumnStartOffset += columns.capacity();
columns.reset();
}
+ int columnIndex = writer.getColumnIndex();
+ writeContext.startWritingColumn(columnIndex, overlapping);
int columnRelativeOffset = columns.size();
columns.reserveInteger(columnLengthPointer);
setColumnOffset(writer.getColumnIndex(), nonKeyColumnStartOffset +
columnRelativeOffset);
@@ -116,10 +128,19 @@ public final class ColumnBatchWriter implements
IColumnBatchWriter {
int length = columns.size() - columnRelativeOffset;
columnLengthPointer.setInteger(length);
+ writeContext.endWritingColumn(columnIndex, length);
}
private boolean hasEnoughSpace(int bufferPosition, IColumnValuesWriter
columnWriter) {
- //Estimated size mostly overestimate the size
+ if (bufferPosition == 0) {
+ // if the current buffer is empty, then use it
+ return true;
+ } else if (tolerance == 1.0d) {
+ // if tolerance is 100%, then it should avoid doing any
calculations and start a with a new page
+ return false;
+ }
+
+ // Estimated size mostly overestimate the size
int columnSize = columnWriter.getEstimatedSize();
float remainingPercentage = (pageSize - bufferPosition) / (float)
pageSize;
if (columnSize > pageSize) {
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index c4f8727b89..55097c5d3e 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -62,6 +62,7 @@ import
org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.DefaultColumnWriteContext;
import org.apache.hyracks.util.StorageUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -151,7 +152,7 @@ public abstract class AbstractBytesTest extends TestBase {
int numberOfTuplesToWrite) throws IOException {
IColumnWriteMultiPageOp multiPageOp =
columnMetadata.getMultiPageOpRef().getValue();
FlushColumnTupleWriter writer = new
FlushColumnTupleWriter(columnMetadata, PAGE_SIZE, MAX_NUMBER_OF_TUPLES,
- TOLERANCE, MAX_LEAF_NODE_SIZE);
+ TOLERANCE, MAX_LEAF_NODE_SIZE,
DefaultColumnWriteContext.INSTANCE);
try {
return writeTuples(fileId, writer, records, numberOfTuplesToWrite,
multiPageOp);
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
index 6a0256c353..296a09f67b 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushLargeTest.java
@@ -34,6 +34,7 @@ import
org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -63,12 +64,12 @@ public class FlushLargeTest extends AbstractBytesTest {
FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
List<IValueReference> record = getParsedRecords();
List<DummyPage> pageZeros = transform(fileId, columnMetadata, record,
numberOfTuplesToWrite);
- QueryColumnMetadata readMetadata =
- QueryColumnMetadata.create(columnMetadata.getDatasetType(),
columnMetadata.getNumberOfPrimaryKeys(),
- columnMetadata.serializeColumnsMetadata(), new
ColumnValueReaderFactory(),
- ValueGetterFactory.INSTANCE,
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
- Collections.emptyMap(),
NoOpColumnFilterEvaluatorFactory.INSTANCE,
- NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null);
+ QueryColumnMetadata readMetadata =
QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+ columnMetadata.getNumberOfPrimaryKeys(),
columnMetadata.serializeColumnsMetadata(),
+ new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+ ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
Collections.emptyMap(),
+ NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpColumnFilterEvaluatorFactory.INSTANCE,
+ NoOpWarningCollector.INSTANCE, null,
ColumnProjectorType.QUERY);
writeResult(fileId, readMetadata, pageZeros);
testCase.compareRepeated(numberOfTuplesToWrite);
}
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
index 8b45142c78..463b89f8e1 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/FlushSmallTest.java
@@ -34,6 +34,7 @@ import
org.apache.asterix.common.exceptions.NoOpWarningCollector;
import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -62,12 +63,12 @@ public class FlushSmallTest extends AbstractBytesTest {
FlushColumnMetadata columnMetadata = prepareNewFile(fileId);
List<IValueReference> record = getParsedRecords();
List<DummyPage> pageZeros = transform(fileId, columnMetadata, record,
record.size());
- QueryColumnMetadata readMetadata =
- QueryColumnMetadata.create(columnMetadata.getDatasetType(),
columnMetadata.getNumberOfPrimaryKeys(),
- columnMetadata.serializeColumnsMetadata(), new
ColumnValueReaderFactory(),
- ValueGetterFactory.INSTANCE,
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
- Collections.emptyMap(),
NoOpColumnFilterEvaluatorFactory.INSTANCE,
- NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpWarningCollector.INSTANCE, null);
+ QueryColumnMetadata readMetadata =
QueryColumnMetadata.create(columnMetadata.getDatasetType(),
+ columnMetadata.getNumberOfPrimaryKeys(),
columnMetadata.serializeColumnsMetadata(),
+ new ColumnValueReaderFactory(), ValueGetterFactory.INSTANCE,
+ ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE,
Collections.emptyMap(),
+ NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpColumnFilterEvaluatorFactory.INSTANCE,
+ NoOpWarningCollector.INSTANCE, null,
ColumnProjectorType.QUERY);
writeResult(fileId, readMetadata, pageZeros);
testCase.compare();
}
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 80106dcebe..ab06484a2d 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -49,6 +49,7 @@ import
org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IValueReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -99,7 +100,7 @@ public class AssemblerTest extends AbstractDummyTest {
columnMetadata.getNumberOfPrimaryKeys(),
columnMetadata.serializeColumnsMetadata(), readerFactory,
DummyValueGetterFactory.INSTANCE,
ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE, Collections.emptyMap(),
NoOpColumnFilterEvaluatorFactory.INSTANCE,
NoOpColumnFilterEvaluatorFactory.INSTANCE,
- NoOpWarningCollector.INSTANCE, null);
+ NoOpWarningCollector.INSTANCE, null,
ColumnProjectorType.QUERY);
AbstractBytesInputStream[] streams = new
AbstractBytesInputStream[columnMetadata.getNumberOfColumns()];
Arrays.fill(streams, DummyBytesInputStream.INSTANCE);
diff --git
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
index eab824aa16..234f804a98 100644
---
a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
+++
b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/values/writer/NoOpColumnBatchWriter.java
@@ -33,16 +33,21 @@ public class NoOpColumnBatchWriter implements
IColumnBatchWriter {
@Override
public void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int
numberOfColumns, int numberOfPrimaryKeys) {
-
+ // NoOp
}
@Override
- public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters)
throws HyracksDataException {
- return 0;
+ public void writePrimaryKeyColumns(IColumnValuesWriter[]
primaryKeyWriters) throws HyracksDataException {
+ // NoOp
}
@Override
public int writeColumns(PriorityQueue<IColumnValuesWriter>
nonKeysColumnWriters) throws HyracksDataException {
return 0;
}
+
+ @Override
+ public void close() {
+ // NoOp
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 43aa8730dd..bbd8d87eda 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -92,6 +92,11 @@
<artifactId>hyracks-control-nc</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-cloud</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
@@ -104,5 +109,10 @@
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
similarity index 53%
copy from
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
copy to
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
index 641f704f5e..29258b5b70 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/ColumnProjectorType.java
@@ -18,34 +18,8 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.api.projection;
-/**
- * Gets information about the requested columns
- */
-public interface IColumnProjectionInfo {
- /**
- * @param ordinal position of the requested column
- * @return column index given the ordinal number of the requested column
- */
- int getColumnIndex(int ordinal);
-
- /**
- * @return total number of requested columns
- */
- int getNumberOfProjectedColumns();
-
- /**
- * @return number of primary keys
- */
- int getNumberOfPrimaryKeys();
-
- /**
- * @param ordinal position of the filtered column
- * @return column index given the ordinal number of the filtered column
- */
- int getFilteredColumnIndex(int ordinal);
-
- /**
- * @return number of filtered columns
- */
- int getNumberOfFilteredColumns();
+public enum ColumnProjectorType {
+ MERGE,
+ QUERY,
+ MODIFY
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
index 641f704f5e..b1bfe87546 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/projection/IColumnProjectionInfo.java
@@ -48,4 +48,9 @@ public interface IColumnProjectionInfo {
* @return number of filtered columns
*/
int getNumberOfFilteredColumns();
+
+ /**
+ * @return the type of {@link IColumnTupleProjector} that created this
projection info
+ */
+ ColumnProjectorType getProjectorType();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
new file mode 100644
index 0000000000..38a1713eeb
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/CloudColumnIndexDiskCacheManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.DefaultColumnReadContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write.CloudColumnWriteContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.disk.ISweepContext;
+
+/**
+ * The disk manager cannot be shared among different partitions as columns are
local to each partition.
+ * For example, column 9 in partition 0 corresponds to "salary" while column 9
in partition 1 corresponds to "age".
+ */
+public final class CloudColumnIndexDiskCacheManager implements
IColumnIndexDiskCacheManager {
+ private final IColumnTupleProjector sweepProjector;
+ private final IPhysicalDrive drive;
+ private final ColumnSweepPlanner planner;
+ private final ColumnSweeper sweeper;
+
+ public CloudColumnIndexDiskCacheManager(int numberOfPrimaryKeys,
IColumnTupleProjector sweepProjector,
+ IPhysicalDrive drive) {
+ this.sweepProjector = sweepProjector;
+ this.drive = drive;
+ planner = new ColumnSweepPlanner(numberOfPrimaryKeys,
System::nanoTime);
+ sweeper = new ColumnSweeper(numberOfPrimaryKeys);
+ }
+
+ @Override
+ public void activate(int numberOfColumns, List<ILSMDiskComponent>
diskComponents, IBufferCache bufferCache)
+ throws HyracksDataException {
+ planner.onActivate(numberOfColumns, diskComponents, sweepProjector,
bufferCache);
+ }
+
+ @Override
+ public IColumnWriteContext createWriteContext(int numberOfColumns,
LSMIOOperationType operationType) {
+ return new CloudColumnWriteContext(drive, planner, numberOfColumns);
+ }
+
+ @Override
+ public IColumnReadContext createReadContext(IColumnProjectionInfo
projectionInfo) {
+ ColumnProjectorType projectorType = projectionInfo.getProjectorType();
+ if (projectorType == ColumnProjectorType.QUERY) {
+ planner.access(projectionInfo, drive.hasSpace());
+ } else if (projectorType == ColumnProjectorType.MODIFY) {
+ planner.setIndexedColumns(projectionInfo);
+ // Requested (and indexed) columns will be persisted if space
permits
+ return DefaultColumnReadContext.INSTANCE;
+ }
+ return new CloudColumnReadContext(projectionInfo, drive,
planner.getPlanCopy());
+ }
+
+ @Override
+ public boolean isActive() {
+ return planner.isActive();
+ }
+
+ @Override
+ public boolean isSweepable() {
+ return true;
+ }
+
+ @Override
+ public boolean prepareSweepPlan() {
+ return planner.plan();
+ }
+
+ @Override
+ public long sweep(ISweepContext context) throws HyracksDataException {
+ SweepContext sweepContext = (SweepContext) context;
+ return sweeper.sweep(planner.getPlanCopy(), sweepContext,
sweepProjector);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
new file mode 100644
index 0000000000..9fc60fa1a5
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/ColumnRanges.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud;
+
+import static
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeperUtil.EMPTY;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getColumnPageIndex;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil.getNumberOfPages;
+
+import java.util.BitSet;
+
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudColumnReadContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweeper;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+import it.unimi.dsi.fastutil.longs.LongComparator;
+
+/**
+ * Computes columns offsets, lengths, and pages
+ */
+public final class ColumnRanges {
+ private static final LongComparator OFFSET_COMPARATOR =
+ (x, y) -> Integer.compare(getOffsetFromPair(x),
getOffsetFromPair(y));
+ private final int numberOfPrimaryKeys;
+
+ // For eviction
+ private final BitSet nonEvictablePages;
+
+ // For Query
+ private final BitSet evictablePages;
+ private final BitSet cloudOnlyPages;
+
+ private ColumnBTreeReadLeafFrame leafFrame;
+ private long[] offsetColumnIndexPairs;
+ private int[] lengths;
+ private int[] columnsOrder;
+ private int pageZeroId;
+
+ public ColumnRanges(int numberOfPrimaryKeys) {
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+
+ offsetColumnIndexPairs = new long[0];
+ lengths = new int[0];
+ columnsOrder = new int[0];
+
+ nonEvictablePages = new BitSet();
+
+ evictablePages = new BitSet();
+ cloudOnlyPages = new BitSet();
+ }
+
+ /**
+ * @return number of primary keys
+ */
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ /**
+ * Reset ranges for initializing {@link ColumnSweepPlanner}
+ *
+ * @param leafFrame to compute the ranges for
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame) {
+ reset(leafFrame, EMPTY, EMPTY, EMPTY);
+ }
+
+ /**
+ * Reset column ranges for {@link ColumnSweeper}
+ *
+ * @param leafFrame to compute the ranges for
+ * @param plan eviction plan
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet plan) {
+ reset(leafFrame, plan, EMPTY, EMPTY);
+ }
+
+ /**
+ * Reset ranges for {@link CloudColumnReadContext}
+ *
+ * @param leafFrame to compute the ranges for
+ * @param requestedColumns required columns
+ * @param evictableColumns columns that are or will be evicted
+ * @param cloudOnlyColumns locked columns that cannot be read from a local
disk
+ */
+ public void reset(ColumnBTreeReadLeafFrame leafFrame, BitSet
requestedColumns, BitSet evictableColumns,
+ BitSet cloudOnlyColumns) {
+ // Set leafFrame
+ this.leafFrame = leafFrame;
+ // Ensure arrays capacities (given the leafFrame's columns and pages)
+ init();
+
+ // Get the number of columns in a page
+ int numberOfColumns = leafFrame.getNumberOfColumns();
+ for (int i = 0; i < numberOfColumns; i++) {
+ long offset = leafFrame.getColumnOffset(i);
+ // Set the first 32-bits to the offset and the second 32-bits to
columnIndex
+ offsetColumnIndexPairs[i] = (offset << 32) + i;
+ }
+
+ // Set artificial offset to determine the last column's length
+ offsetColumnIndexPairs[numberOfColumns] =
(leafFrame.getMegaLeafNodeLengthInBytes() << 32) + numberOfColumns;
+
+ // Sort the pairs by offset (i.e., lowest offset first)
+ LongArrays.stableSort(offsetColumnIndexPairs, 0, numberOfColumns,
OFFSET_COMPARATOR);
+
+ int columnOrdinal = 0;
+ for (int i = 0; i < numberOfColumns; i++) {
+ int columnIndex =
getColumnIndexFromPair(offsetColumnIndexPairs[i]);
+ int offset = getOffsetFromPair(offsetColumnIndexPairs[i]);
+ int nextOffset = getOffsetFromPair(offsetColumnIndexPairs[i + 1]);
+
+ // Compute the column's length in bytes (set 0 for PKs)
+ int length = columnIndex < numberOfPrimaryKeys ? 0 : nextOffset -
offset;
+ lengths[columnIndex] = length;
+
+ // Get start page ID (given the computed length above)
+ int startPageId = getColumnStartPageIndex(columnIndex);
+ // Get the number of pages (given the computed length above)
+ int numberOfPages = getColumnNumberOfPages(columnIndex);
+
+ if (columnIndex >= numberOfPrimaryKeys &&
requestedColumns.get(columnIndex)) {
+ // Set column index
+ columnsOrder[columnOrdinal++] = columnIndex;
+ // Compute cloud-only and evictable pages
+ setCloudOnlyAndEvictablePages(columnIndex, cloudOnlyColumns,
evictableColumns, startPageId,
+ numberOfPages);
+ // A requested column. Keep its pages as requested
+ continue;
+ }
+
+ // Mark the page as non-evictable
+ for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+ nonEvictablePages.set(j);
+ }
+ }
+
+ // Bound the nonRequestedPages to the number of pages in the mega leaf
node
+ nonEvictablePages.set(leafFrame.getMegaLeafNodeNumberOfPages());
+ // to indicate the end
+ columnsOrder[columnOrdinal] = -1;
+ }
+
+ /**
+ * First page of a column
+ *
+ * @param columnIndex column index
+ * @return pageID
+ */
+ public int getColumnStartPageIndex(int columnIndex) {
+ int pageSize = leafFrame.getBuffer().capacity();
+ return getColumnPageIndex(leafFrame.getColumnOffset(columnIndex),
pageSize);
+ }
+
+ /**
+ * Length of a column in pages
+ *
+ * @param columnIndex column index
+ * @return number of pages
+ */
+ public int getColumnNumberOfPages(int columnIndex) {
+ int pageSize = leafFrame.getBuffer().capacity();
+ int numberOfPages = getNumberOfPages(getColumnLength(columnIndex),
pageSize);
+ return numberOfPages == 0 ? 1 : numberOfPages;
+ }
+
+ /**
+ * Length of a column in bytes
+ *
+ * @param columnIndex column index
+ * @return number of bytes
+ */
+ public int getColumnLength(int columnIndex) {
+ return lengths[columnIndex];
+ }
+
+ /**
+ * Returns true if the page is meant to be read from the cloud only
+ *
+ * @param pageId page ID
+ * @return true of the page should be read from the cloud, false otherwise
+ * @see #reset(ColumnBTreeReadLeafFrame, BitSet, BitSet, BitSet)
+ */
+ public boolean isCloudOnly(int pageId) {
+ // Compute the relative page ID for this mega leaf node
+ int relativePageId = pageId - pageZeroId;
+ return cloudOnlyPages.get(relativePageId);
+ }
+
+ /**
+ * Whether the page has been or will be evicted
+ *
+ * @param pageId page ID
+ * @return true of the page was or will be evicted, false otherwise
+ */
+ public boolean isEvictable(int pageId) {
+ int relativePageId = pageId - pageZeroId;
+ return evictablePages.get(relativePageId);
+ }
+
+ /**
+ * @return Bitset of all non-requested pages
+ */
+ public BitSet getNonEvictablePages() {
+ return nonEvictablePages;
+ }
+
+ /**
+ * @return you the order of columns that should be read in order to ensure
(semi) sequential access.
+ * Sequential means page X is read before page Y, forall X and Y where X <
Y
+ */
+ public int[] getColumnsOrder() {
+ return columnsOrder;
+ }
+
+ private void init() {
+ int numberOfColumns = leafFrame.getNumberOfColumns();
+ offsetColumnIndexPairs =
LongArrays.ensureCapacity(offsetColumnIndexPairs, numberOfColumns + 1, 0);
+ lengths = IntArrays.ensureCapacity(lengths, numberOfColumns, 0);
+ columnsOrder = IntArrays.ensureCapacity(columnsOrder, numberOfColumns
+ 1, 0);
+ nonEvictablePages.clear();
+ evictablePages.clear();
+ cloudOnlyPages.clear();
+ pageZeroId = leafFrame.getPageId();
+ }
+
+ private static int getOffsetFromPair(long pair) {
+ return (int) (pair >> 32);
+ }
+
+ private static int getColumnIndexFromPair(long pair) {
+ return (int) pair;
+ }
+
+ private void setCloudOnlyAndEvictablePages(int columnIndex, BitSet
cloudOnlyColumns, BitSet evictableColumns,
+ int startPageId, int numberOfPages) {
+ if (evictableColumns == EMPTY && cloudOnlyColumns == EMPTY) {
+ return;
+ }
+
+ // Find pages that meant to be read from the cloud only or are
evictable
+ boolean cloudOnly = cloudOnlyColumns.get(columnIndex);
+ boolean evictable = evictableColumns.get(columnIndex);
+ if (cloudOnly || evictable) {
+ for (int j = startPageId; j < startPageId + numberOfPages; j++) {
+ if (cloudOnly) {
+ cloudOnlyPages.set(j);
+ } else {
+ evictablePages.set(j);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+ builder.append(" ");
+ for (int i = 0; i < numberOfPages; i++) {
+ builder.append(String.format("%02d", i));
+ builder.append(" ");
+ }
+
+ builder.append('\n');
+ for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+ builder.append(String.format("%03d", i));
+ builder.append(":");
+ int startPageId = getColumnStartPageIndex(i);
+ int columnPagesCount = getColumnNumberOfPages(i);
+ printColumnPages(builder, numberOfPages, startPageId,
columnPagesCount);
+ }
+
+ builder.append("nonEvictablePages: ");
+ builder.append(nonEvictablePages);
+ builder.append('\n');
+ builder.append("evictablePages: ");
+ builder.append(evictablePages);
+ builder.append('\n');
+ builder.append("cloudOnlyPages: ");
+ builder.append(cloudOnlyPages);
+
+ return builder.toString();
+ }
+
+ private void printColumnPages(StringBuilder builder, int numberOfPages,
int startPageId, int columnPagesCount) {
+ for (int i = 0; i < numberOfPages; i++) {
+ builder.append(" ");
+ if (i >= startPageId && i < startPageId + columnPagesCount) {
+ builder.append(1);
+ } else {
+ builder.append(0);
+ }
+ }
+ builder.append('\n');
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
new file mode 100644
index 0000000000..af69fe5f8d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+import org.apache.hyracks.control.nc.io.IOManager;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnReadContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepLockInfo;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+
+@NotThreadSafe
+public final class CloudColumnReadContext implements IColumnReadContext {
+ private final ColumnProjectorType operation;
+ private final IPhysicalDrive drive;
+ private final BitSet plan;
+ private final BitSet cloudOnlyColumns;
+ private final ColumnRanges columnRanges;
+ private final CloudMegaPageReadContext columnCtx;
+ private final List<ICachedPage> pinnedPages;
+ private final BitSet projectedColumns;
+
+ public CloudColumnReadContext(IColumnProjectionInfo projectionInfo,
IPhysicalDrive drive, BitSet plan) {
+ this.operation = projectionInfo.getProjectorType();
+ this.drive = drive;
+ this.plan = plan;
+ columnRanges = new
ColumnRanges(projectionInfo.getNumberOfPrimaryKeys());
+ cloudOnlyColumns = new BitSet();
+ columnCtx = new CloudMegaPageReadContext(operation, columnRanges,
drive);
+ pinnedPages = new ArrayList<>();
+ projectedColumns = new BitSet();
+ if (operation == QUERY || operation == MODIFY) {
+ for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns();
i++) {
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ if (columnIndex >= 0) {
+ projectedColumns.set(columnIndex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ CloudCachedPage cloudPage = (CloudCachedPage) page;
+ ISweepLockInfo lockTest = cloudPage.beforeRead();
+ if (lockTest.isLocked()) {
+ ColumnSweepLockInfo lockedColumns = (ColumnSweepLockInfo) lockTest;
+ lockedColumns.getLockedColumns(cloudOnlyColumns);
+ }
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ CloudCachedPage cloudPage = (CloudCachedPage) page;
+ cloudPage.afterRead();
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle
fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ // Page zero will be persisted (always) if free space permits
+ return readAndPersistPage(ioManager, fileHandle, header, cPage,
drive.hasSpace());
+ }
+
+ @Override
+ public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame,
IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ // TODO do we support prefetching?
+ ICachedPage nextPage =
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId,
leafFrame.getNextLeaf()), this);
+ release(bufferCache);
+ bufferCache.unpin(leafFrame.getPage(), this);
+ leafFrame.setPage(nextPage);
+ return nextPage;
+ }
+
+ @Override
+ public void prepareColumns(ColumnBTreeReadLeafFrame leafFrame,
IBufferCache bufferCache, int fileId)
+ throws HyracksDataException {
+ if (leafFrame.getTupleCount() == 0) {
+ return;
+ }
+
+ // TODO handle prefetch if supported
+
+ columnRanges.reset(leafFrame, projectedColumns, plan,
cloudOnlyColumns);
+ int pageZeroId = leafFrame.getPageId();
+ int[] columnsOrders = columnRanges.getColumnsOrder();
+ int i = 0;
+ int columnIndex = columnsOrders[i];
+ while (columnIndex > -1) {
+ if (columnIndex < columnRanges.getNumberOfPrimaryKeys()) {
+ columnIndex = columnsOrders[++i];
+ continue;
+ }
+
+ int startPageId =
columnRanges.getColumnStartPageIndex(columnIndex);
+ // Will increment the number pages if the next column's pages are
contiguous to this column's pages
+ int numberOfPages =
columnRanges.getColumnNumberOfPages(columnIndex);
+
+ // Advance to the next column to check if it has contiguous pages
+ columnIndex = columnsOrders[++i];
+ while (columnIndex > -1) {
+ // Get the next column's start page ID
+ int nextStartPageId =
columnRanges.getColumnStartPageIndex(columnIndex);
+ if (nextStartPageId > startPageId + numberOfPages + 1) {
+ // The next startPageId is not contiguous, stop.
+ break;
+ }
+
+ // Last page of this column
+ int nextLastPage = nextStartPageId +
columnRanges.getColumnNumberOfPages(columnIndex);
+ // The next column's pages are contiguous. Combine its ranges
with the previous one.
+ numberOfPages = nextLastPage - startPageId;
+ // Advance to the next column
+ columnIndex = columnsOrders[++i];
+ }
+
+ columnCtx.prepare(numberOfPages);
+ pin(bufferCache, fileId, pageZeroId, startPageId, numberOfPages);
+ }
+ }
+
+ private void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int
start, int numOfRequestedPages)
+ throws HyracksDataException {
+ for (int i = start; i < start + numOfRequestedPages; i++) {
+ long dpid = BufferedFileHandle.getDiskPageId(fileId, pageZeroId +
i);
+ pinnedPages.add(bufferCache.pin(dpid, columnCtx));
+ }
+ }
+
+ @Override
+ public void release(IBufferCache bufferCache) throws HyracksDataException {
+ // Release might differ in the future if prefetching is supported
+ close(bufferCache);
+ }
+
+ @Override
+ public void close(IBufferCache bufferCache) throws HyracksDataException {
+ release(pinnedPages, bufferCache, columnCtx);
+ columnCtx.close();
+ }
+
+ private static void release(List<ICachedPage> pinnedPages, IBufferCache
bufferCache, IBufferCacheReadContext ctx)
+ throws HyracksDataException {
+ for (int i = 0; i < pinnedPages.size(); i++) {
+ bufferCache.unpin(pinnedPages.get(i), ctx);
+ }
+ pinnedPages.clear();
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
new file mode 100644
index 0000000000..0f0b0b9506
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
+import static
org.apache.hyracks.storage.common.buffercache.IBufferCache.RESERVED_HEADER_BYTES;
+import static
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.control.nc.io.IOManager;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+@NotThreadSafe
+final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ColumnProjectorType operation;
+ private final ColumnRanges columnRanges;
+ private final IPhysicalDrive drive;
+ private int numberOfContiguousPages;
+ private int pageCounter;
+ private InputStream gapStream;
+
+ CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges
columnRanges, IPhysicalDrive drive) {
+ this.operation = operation;
+ this.columnRanges = columnRanges;
+ this.drive = drive;
+ }
+
+ public void prepare(int numberOfContiguousPages) throws
HyracksDataException {
+ close();
+ this.numberOfContiguousPages = numberOfContiguousPages;
+ pageCounter = 0;
+ }
+
+ @Override
+ public void onPin(ICachedPage page) throws HyracksDataException {
+ CloudCachedPage cachedPage = (CloudCachedPage) page;
+ if (gapStream != null && cachedPage.skipCloudStream()) {
+ /*
+ * This page is requested but the buffer cache has a valid copy in
memory. Also, the page itself was
+ * requested to be read from the cloud. Since this page is valid,
no buffer cache read() will be performed.
+ * As the buffer cache read() is also responsible for persisting
the bytes read from the cloud, we can end
+ * up writing the bytes of this page in the position of another
page. Therefore, we should skip the bytes
+ * for this particular page to avoid placing the bytes of this
page into another page's position.
+ */
+ try {
+ long remaining = cachedPage.getCompressedPageSize();
+ while (remaining > 0) {
+ remaining -= gapStream.skip(remaining);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ return true;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle
fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ boolean empty = BufferCacheCloudReadContextUtil.isEmpty(header);
+ int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+ boolean cloudOnly = columnRanges.isCloudOnly(pageId);
+ ByteBuffer buffer;
+ if (empty || cloudOnly || gapStream != null) {
+ boolean evictable = columnRanges.isEvictable(pageId);
+ /*
+ * Persist iff the following conditions are satisfied:
+ * - The page is empty
+ * - The page is not being evicted (cloudOnly)
+ * - The page is not planned for eviction (evictable)
+ * - The operation is not a merge operation (the component will be
deleted anyway)
+ * - The disk has space
+ *
+ * Note: 'emtpy' can be false while 'cloudOnly is true'. We cannot
read from disk as the page can be
+ * evicted at any moment. In other words, the sweeper told us that
it is going to evict this page; hence
+ * 'cloudOnly' is true.
+ */
+ boolean persist = empty && !cloudOnly && !evictable && operation
!= MERGE && drive.hasSpace();
+ buffer = readFromStream(ioManager, fileHandle, header, cPage,
persist);
+ buffer.position(RESERVED_HEADER_BYTES);
+ } else {
+ /*
+ * Here we can find a page that is planned for eviction, but it
has not being evicted yet
+ * (i.e., empty = false). This could happen if the cursor is at a
point the sweeper hasn't
+ * reached yet (i.e., cloudOnly = false).
+ */
+ buffer = DEFAULT.processHeader(ioManager, fileHandle, header,
cPage);
+ }
+
+ if (++pageCounter == numberOfContiguousPages) {
+ close();
+ }
+
+ return buffer;
+ }
+
+ void close() throws HyracksDataException {
+ if (gapStream != null) {
+ try {
+ gapStream.close();
+ gapStream = null;
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
+
+ private ByteBuffer readFromStream(IOManager ioManager, BufferedFileHandle
fileHandle,
+ BufferCacheHeaderHelper header, CachedPage cPage, boolean persist)
throws HyracksDataException {
+ InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+ ByteBuffer buffer = header.getBuffer();
+ buffer.position(0);
+ try {
+ while (buffer.remaining() != 0) {
+ int length = stream.read(buffer.array(), buffer.position(),
buffer.remaining());
+ if (length < 0) {
+ throw new IllegalStateException("Stream should not be
empty!");
+ }
+ buffer.position(buffer.position() + length);
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+
+ buffer.flip();
+
+ if (persist) {
+ long offset = cPage.getCompressedPageOffset();
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ BufferCacheCloudReadContextUtil.persist(cloudIOManager,
fileHandle.getFileHandle(), buffer, offset);
+ }
+
+ return buffer;
+ }
+
+ private InputStream getOrCreateStream(IOManager ioManager,
BufferedFileHandle fileHandle, CachedPage cPage)
+ throws HyracksDataException {
+ if (gapStream != null) {
+ return gapStream;
+ }
+
+ LOGGER.info("Cloud stream read for {} pages", numberOfContiguousPages
- pageCounter);
+ int requiredNumOfPages = numberOfContiguousPages - pageCounter;
+ long offset = cPage.getCompressedPageOffset();
+ int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
+ long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
+
+ ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
+ gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(),
offset, length);
+
+ return gapStream;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
new file mode 100644
index 0000000000..dcde8336a0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/write/CloudColumnWriteContext.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.write;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import
org.apache.hyracks.cloud.buffercache.context.DefaultCloudOnlyWriteContext;
+import org.apache.hyracks.control.nc.io.IOManager;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.IColumnWriteContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ColumnSweepPlanner;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheWriteContext;
+import
org.apache.hyracks.storage.common.buffercache.context.write.DefaultBufferCacheWriteContext;
+import org.apache.hyracks.storage.common.disk.IPhysicalDrive;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public final class CloudColumnWriteContext implements IColumnWriteContext {
+ private static final int INITIAL_NUMBER_OF_COLUMNS = 32;
+ private final IPhysicalDrive drive;
+ private final ColumnSweepPlanner planner;
+ private final BitSet plan;
+ private final IntSet indexedColumns;
+ private int[] sizes;
+ private int numberOfColumns;
+ private IBufferCacheWriteContext currentContext;
+ /**
+ * writeAndSwap = true means the next call to write() will persist the
page locally and swap to cloud-only
+ * writer (i.e., the following pages will be written in the cloud but not
locally).
+ * <p>
+ * 'writeAndSwap' is set to 'true' iff the previous column's last page
should be persisted AND it is
+ * 'overlapping' with this column's first page.
+ */
+ private boolean writeLocallyAndSwitchToCloudOnly;
+
+ public CloudColumnWriteContext(IPhysicalDrive drive, ColumnSweepPlanner
planner, int numberOfColumns) {
+ this.drive = drive;
+ this.planner = planner;
+ this.plan = planner.getPlanCopy();
+ this.indexedColumns = planner.getIndexedColumnsCopy();
+ int initialLength =
+ planner.getNumberOfPrimaryKeys() == numberOfColumns ?
INITIAL_NUMBER_OF_COLUMNS : numberOfColumns;
+ sizes = new int[initialLength];
+ // Number of columns is not known during the flush operation
+ this.numberOfColumns = 0;
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ }
+
+ @Override
+ public void startWritingColumn(int columnIndex, boolean overlapping) {
+ if (drive.hasSpace() || indexedColumns.contains(columnIndex)) {
+ // The current column should be persisted locally if free disk
space permits
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ } else if (plan.get(columnIndex)) {
+ // This column was planned for eviction, do not persist.
+ if (overlapping && currentContext ==
DefaultBufferCacheWriteContext.INSTANCE) {
+ // The previous column's last page should be persisted AND it
is overlapping with the current column's
+ // first page. Persist the first page locally and switch to
cloud-only writer.
+ writeLocallyAndSwitchToCloudOnly = true;
+ } else {
+ // The previous column's last page is not overlapping. Switch
to cloud-only writer (if not already)
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ }
+ } else {
+ // Local drive is pressured. Write to cloud only.
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ }
+ }
+
+ @Override
+ public void endWritingColumn(int columnIndex, int size) {
+ ensureCapacity(columnIndex);
+ sizes[columnIndex] = Math.max(sizes[columnIndex], size);
+ }
+
+ @Override
+ public void columnsPersisted() {
+ // Set the default writer context to persist pageZero and the interior
nodes' pages locally
+ currentContext = DefaultBufferCacheWriteContext.INSTANCE;
+ writeLocallyAndSwitchToCloudOnly = false;
+ }
+
+ @Override
+ public void close() {
+ // Report the sizes of the written columns
+ planner.adjustColumnSizes(sizes, numberOfColumns);
+ }
+
+ /*
+ *
************************************************************************************************
+ * WRITE methods
+ *
************************************************************************************************
+ */
+
+ @Override
+ public int write(IOManager ioManager, IFileHandle handle, long offset,
ByteBuffer data)
+ throws HyracksDataException {
+ int writtenBytes = currentContext.write(ioManager, handle, offset,
data);
+ switchIfNeeded();
+ return writtenBytes;
+ }
+
+ @Override
+ public long write(IOManager ioManager, IFileHandle handle, long offset,
ByteBuffer[] data)
+ throws HyracksDataException {
+ long writtenBytes = currentContext.write(ioManager, handle, offset,
data);
+ switchIfNeeded();
+ return writtenBytes;
+ }
+
+ /*
+ *
************************************************************************************************
+ * helper methods
+ *
************************************************************************************************
+ */
+
+ private void ensureCapacity(int columnIndex) {
+ int length = sizes.length;
+ if (columnIndex >= length) {
+ sizes = IntArrays.grow(sizes, columnIndex + 1);
+ }
+
+ numberOfColumns = Math.max(numberOfColumns, columnIndex + 1);
+ }
+
+ private void switchIfNeeded() {
+ if (writeLocallyAndSwitchToCloudOnly) {
+ // Switch to cloud-only writer
+ currentContext = DefaultCloudOnlyWriteContext.INSTANCE;
+ writeLocallyAndSwitchToCloudOnly = false;
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
new file mode 100644
index 0000000000..ed83e84531
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepLockInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.cloud.buffercache.page.ISweepLockInfo;
+
+public final class ColumnSweepLockInfo implements ISweepLockInfo {
+ private final BitSet lockedColumns;
+
+ public ColumnSweepLockInfo() {
+ lockedColumns = new BitSet();
+ }
+
+ /**
+ * Reset the lock with plan's columns
+ *
+ * @param plan contains the columns to be locked
+ */
+ void reset(BitSet plan) {
+ lockedColumns.clear();
+ lockedColumns.or(plan);
+ }
+
+ /**
+ * Clear and set the locked columns in the provided {@link BitSet}
+ *
+ * @param lockedColumns used to get the locked columns
+ */
+ public void getLockedColumns(BitSet lockedColumns) {
+ lockedColumns.clear();
+ lockedColumns.or(this.lockedColumns);
+ }
+
+ @Override
+ public boolean isLocked() {
+ return true;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
new file mode 100644
index 0000000000..9549f4f7c6
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlanner.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static
org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongArrays;
+
+public final class ColumnSweepPlanner {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final double SIZE_WEIGHT = 0.3d;
+ private static final double LAST_ACCESS_WEIGHT = 1.0d - SIZE_WEIGHT;
+ private static final double INITIAL_PUNCHABLE_THRESHOLD = 0.7d;
+ private static final double PUNCHABLE_THRESHOLD_DECREMENT = 0.7d;
+ private static final int MAX_ITERATION_COUNT = 5;
+ private static final int REEVALUATE_PLAN_THRESHOLD = 50;
+
+ private final AtomicBoolean active;
+ private final int numberOfPrimaryKeys;
+ private final BitSet plan;
+ private final BitSet reevaluatedPlan;
+ private final IntSet indexedColumns;
+ private final ISweepClock clock;
+ private int numberOfColumns;
+ private long lastAccess;
+ private int maxSize;
+ private int[] sizes;
+ private long[] lastAccesses;
+
+ private double punchableThreshold;
+ private long lastSweepTs;
+ private int numberOfSweptColumns;
+ private int numberOfCloudRequests;
+
+ public ColumnSweepPlanner(int numberOfPrimaryKeys, ISweepClock clock) {
+ this.clock = clock;
+ active = new AtomicBoolean(false);
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ sizes = new int[0];
+ lastAccesses = new long[0];
+ indexedColumns = new IntOpenHashSet();
+ plan = new BitSet();
+ reevaluatedPlan = new BitSet();
+ punchableThreshold = INITIAL_PUNCHABLE_THRESHOLD;
+ }
+
+ public boolean isActive() {
+ return active.get();
+ }
+
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ public void onActivate(int numberOfColumns, List<ILSMDiskComponent>
diskComponents,
+ IColumnTupleProjector sweepProjector, IBufferCache bufferCache)
throws HyracksDataException {
+ resizeStatsArrays(numberOfColumns);
+ setInitialSizes(diskComponents, sweepProjector, bufferCache);
+ active.set(true);
+ }
+
+ public void setIndexedColumns(IColumnProjectionInfo projectionInfo) {
+ indexedColumns.clear();
+ for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns(); i++)
{
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ indexedColumns.add(columnIndex);
+ }
+ }
+
+ public IntSet getIndexedColumnsCopy() {
+ return new IntOpenHashSet(indexedColumns);
+ }
+
+ public synchronized void access(IColumnProjectionInfo projectionInfo,
boolean hasSpace) {
+ resetPlanIfNeeded(hasSpace);
+ long accessTime = clock.getCurrentTime();
+ lastAccess = accessTime;
+ int numberOfColumns = projectionInfo.getNumberOfProjectedColumns();
+ boolean requireCloudAccess = false;
+ for (int i = 0; i < numberOfColumns; i++) {
+ int columnIndex = projectionInfo.getColumnIndex(i);
+ if (columnIndex >= 0) {
+ // columnIndex can be -1 when accessing a non-existing column
(i.e., not known by the schema)
+ lastAccesses[columnIndex] = accessTime;
+ requireCloudAccess |= numberOfSweptColumns > 0 &&
plan.get(columnIndex);
+ }
+ }
+
+ numberOfCloudRequests += requireCloudAccess ? 1 : 0;
+ }
+
+ public synchronized void adjustColumnSizes(int[] newSizes, int
numberOfColumns) {
+ resizeStatsArrays(numberOfColumns);
+ for (int i = 0; i < numberOfColumns; i++) {
+ int newSize = newSizes[i];
+ sizes[i] = Math.max(sizes[i], newSize);
+ maxSize = Math.max(maxSize, newSize);
+ }
+ }
+
+ public synchronized boolean plan() {
+ plan.clear();
+ int numberOfEvictableColumns = 0;
+ int iter = 0;
+ // Calculate weights: Ensure the plan contains new columns that never
been swept
+ while (iter < MAX_ITERATION_COUNT && numberOfEvictableColumns <
numberOfColumns) {
+ if (numberOfEvictableColumns > 0) {
+ // Do not reiterate if we found columns to evict
+ break;
+ }
+
+ // Find evictable columns
+ numberOfEvictableColumns += findEvictableColumns(plan);
+
+ // The next iteration/plan will be more aggressive
+ punchableThreshold *= PUNCHABLE_THRESHOLD_DECREMENT;
+ iter++;
+ }
+ // Register the plan time
+ lastSweepTs = clock.getCurrentTime();
+ // Add the number of evictable columns
+ numberOfSweptColumns += numberOfEvictableColumns;
+ if (numberOfEvictableColumns > 0) {
+ LOGGER.info("Planning to evict {} columns. The evictable columns
are {}", numberOfEvictableColumns, plan);
+ return true;
+ }
+
+ LOGGER.info("Couldn't find columns to evict after {} iteration", iter);
+ return false;
+ }
+
+ public synchronized BitSet getPlanCopy() {
+ return (BitSet) plan.clone();
+ }
+
+ private double getWeight(int i, IntSet indexedColumns, int
numberOfPrimaryKeys) {
+ if (i < numberOfPrimaryKeys || indexedColumns.contains(i)) {
+ return -1.0;
+ }
+
+ double sizeWeight = sizes[i] / (double) maxSize * SIZE_WEIGHT;
+ double lasAccessWeight = (lastAccess - lastAccesses[i]) / (double)
lastAccess * LAST_ACCESS_WEIGHT;
+
+ return sizeWeight + lasAccessWeight;
+ }
+
+ private void resizeStatsArrays(int numberOfColumns) {
+ sizes = IntArrays.ensureCapacity(sizes, numberOfColumns);
+ lastAccesses = LongArrays.ensureCapacity(lastAccesses,
numberOfColumns);
+ this.numberOfColumns = numberOfColumns - numberOfPrimaryKeys;
+ }
+
+ private void setInitialSizes(List<ILSMDiskComponent> diskComponents,
IColumnTupleProjector sweepProjector,
+ IBufferCache bufferCache) throws HyracksDataException {
+ // This runs when activating an index (no need to synchronize on the
opTracker)
+ if (diskComponents.isEmpty()) {
+ return;
+ }
+
+ IColumnProjectionInfo columnProjectionInfo =
+ ColumnSweeperUtil.createColumnProjectionInfo(diskComponents,
sweepProjector);
+ ILSMDiskComponent latestComponent = diskComponents.get(0);
+ ILSMDiskComponent oldestComponent =
diskComponents.get(diskComponents.size() - 1);
+ ColumnBTreeReadLeafFrame leafFrame =
ColumnSweeperUtil.createLeafFrame(columnProjectionInfo, latestComponent);
+ ColumnRanges ranges = new
ColumnRanges(columnProjectionInfo.getNumberOfPrimaryKeys());
+ // Get the column sizes from the freshest component, which has the
columns of the most recent schema
+ setColumnSizes(latestComponent, leafFrame, ranges, bufferCache);
+ if (isMergedComponent(oldestComponent.getId())) {
+ // Get the column sizes from the oldest merged component, which
probably has the largest columns
+ setColumnSizes(oldestComponent, leafFrame, ranges, bufferCache);
+ }
+ }
+
+ private void setColumnSizes(ILSMDiskComponent diskComponent,
ColumnBTreeReadLeafFrame leafFrame,
+ ColumnRanges ranges, IBufferCache bufferCache) throws
HyracksDataException {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ long dpid = BufferedFileHandle.getDiskPageId(columnBTree.getFileId(),
columnBTree.getBulkloadLeafStart());
+ ICachedPage page = bufferCache.pin(dpid);
+ try {
+ leafFrame.setPage(page);
+ ranges.reset(leafFrame);
+ for (int i = 0; i < leafFrame.getNumberOfColumns(); i++) {
+ sizes[i] = Math.max(sizes[i], ranges.getColumnLength(i));
+ maxSize = Math.max(maxSize, sizes[i]);
+ }
+ } finally {
+ bufferCache.unpin(page);
+ }
+ }
+
+ private int findEvictableColumns(BitSet plan) {
+ int numberOfEvictableColumns = 0;
+ for (int i = 0; i < sizes.length; i++) {
+ if (!plan.get(i) && getWeight(i, indexedColumns,
numberOfPrimaryKeys) >= punchableThreshold) {
+ // Column reached a punchable threshold; include it in the
eviction plan.
+ plan.set(i);
+ numberOfEvictableColumns++;
+ }
+ }
+
+ return numberOfEvictableColumns;
+ }
+
+ private void resetPlanIfNeeded(boolean hasSpace) {
+ if (!hasSpace || numberOfCloudRequests < REEVALUATE_PLAN_THRESHOLD) {
+ return;
+ }
+
+ numberOfCloudRequests = 0;
+ reevaluatedPlan.clear();
+ int numberOfEvictableColumns = findEvictableColumns(reevaluatedPlan);
+ for (int i = 0; i < numberOfEvictableColumns; i++) {
+ int columnIndex = reevaluatedPlan.nextSetBit(i);
+ if (!plan.get(columnIndex)) {
+ // the plan contains a stale column. Invalidate!
+ plan.clear();
+ plan.or(reevaluatedPlan);
+ LOGGER.info("Re-planning to evict {} columns. The newly
evictable columns are {}",
+ numberOfEvictableColumns, plan);
+ break;
+ }
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
new file mode 100644
index 0000000000..4932bf0c7f
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeper.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState.READABLE_UNWRITABLE;
+import static
org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils.isMergedComponent;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
+import org.apache.hyracks.cloud.cache.unit.IndexUnit;
+import org.apache.hyracks.cloud.sweeper.SweepContext;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.LSMColumnBTree;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.annotations.CriticalPath;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class ColumnSweeper {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ColumnSweepLockInfo lockedColumns;
+ private final ColumnRanges ranges;
+ private final List<ILSMDiskComponent> sweepableComponents;
+
+ public ColumnSweeper(int numberOfPrimaryKeys) {
+ lockedColumns = new ColumnSweepLockInfo();
+ ranges = new ColumnRanges(numberOfPrimaryKeys);
+ sweepableComponents = new ArrayList<>();
+ }
+
+ public long sweep(BitSet plan, SweepContext context, IColumnTupleProjector
sweepProjector)
+ throws HyracksDataException {
+ IndexUnit indexUnit = context.getIndexUnit();
+ LSMColumnBTree lsmColumnBTree = (LSMColumnBTree) indexUnit.getIndex();
+ IColumnProjectionInfo projectionInfo =
captureSweepableComponents(lsmColumnBTree, sweepProjector);
+ if (projectionInfo == null) {
+ // no sweepable components
+ return 0L;
+ }
+
+ LOGGER.info("Sweeping {}", lsmColumnBTree);
+ ILSMDiskComponent latestComponent = sweepableComponents.get(0);
+ ColumnBTreeReadLeafFrame leafFrame =
ColumnSweeperUtil.createLeafFrame(projectionInfo, latestComponent);
+ IBufferCacheReadContext bcOpCtx = SweepBufferCacheReadContext.INSTANCE;
+ lockedColumns.reset(plan);
+ long freedSpace = 0L;
+ for (int i = 0; i < sweepableComponents.size(); i++) {
+ if (context.stopSweeping()) {
+ // Exit as the index is being dropped
+ return 0L;
+ }
+
+ boolean failed = false;
+ // Components are entered one at a time to allow components to be
merged and deactivated
+ ILSMDiskComponent diskComponent = enterAndGetComponent(i,
lsmColumnBTree);
+ // If diskComponent is null, that means it is not a viable
candidate anymore
+ if (diskComponent != null) {
+ try {
+ freedSpace += sweepDiskComponent(leafFrame, diskComponent,
plan, context, bcOpCtx);
+ } catch (Throwable e) {
+ failed = true;
+ throw e;
+ } finally {
+ exitComponent(diskComponent, lsmColumnBTree, failed);
+ }
+ }
+ }
+
+ LOGGER.info("Swept {} components and freed {} from disk",
sweepableComponents.size(),
+ StorageUtil.toHumanReadableSize(freedSpace));
+ return freedSpace;
+ }
+
+ @CriticalPath
+ private IColumnProjectionInfo captureSweepableComponents(LSMColumnBTree
lsmColumnBTree,
+ IColumnTupleProjector sweepProjector) throws HyracksDataException {
+ ILSMOperationTracker opTracker = lsmColumnBTree.getOperationTracker();
+ sweepableComponents.clear();
+ synchronized (opTracker) {
+ List<ILSMDiskComponent> diskComponents =
lsmColumnBTree.getDiskComponents();
+ for (int i = 0; i < diskComponents.size(); i++) {
+ ILSMDiskComponent diskComponent = diskComponents.get(i);
+ /*
+ * Get components that are only in READABLE_UNWRITABLE state.
Components that are currently being
+ * merged should not be swept as they will be deleted anyway.
+ * Also, only sweep merged components as flushed components
are relatively smaller and should be
+ * merged eventually. So, it is preferable to read everything
locally when merging flushed components.
+ * TODO should we sweep flushed components?
+ */
+ if (isMergedComponent(diskComponent.getId()) &&
diskComponent.getState() == READABLE_UNWRITABLE
+ && diskComponent.getComponentSize() > 0) {
+ // The component is a good candidate to be swept
+ sweepableComponents.add(diskComponent);
+ }
+ }
+
+ if (sweepableComponents.isEmpty()) {
+ // No sweepable components
+ return null;
+ }
+
+ return
ColumnSweeperUtil.createColumnProjectionInfo(sweepableComponents,
sweepProjector);
+ }
+ }
+
+ private ILSMDiskComponent enterAndGetComponent(int index, LSMColumnBTree
lsmColumnBTree)
+ throws HyracksDataException {
+ ILSMDiskComponent diskComponent = sweepableComponents.get(index);
+ synchronized (lsmColumnBTree.getOperationTracker()) {
+ // Make sure the component is still in READABLE_UNWRITABLE state
+ if (diskComponent.getState() == READABLE_UNWRITABLE
+ &&
diskComponent.threadEnter(LSMOperationType.DISK_COMPONENT_SCAN, false)) {
+ return diskComponent;
+ }
+ }
+
+ // the component is not a viable candidate anymore
+ return null;
+ }
+
+ private void exitComponent(ILSMDiskComponent diskComponent, LSMColumnBTree
lsmColumnBTree, boolean failed)
+ throws HyracksDataException {
+ synchronized (lsmColumnBTree.getOperationTracker()) {
+ diskComponent.threadExit(LSMOperationType.DISK_COMPONENT_SCAN,
failed, false);
+ }
+ }
+
+ private long sweepDiskComponent(ColumnBTreeReadLeafFrame leafFrame,
ILSMDiskComponent diskComponent, BitSet plan,
+ SweepContext context, IBufferCacheReadContext bcOpCtx) throws
HyracksDataException {
+ long dpid = getFirstPageId(diskComponent);
+ int fileId = BufferedFileHandle.getFileId(dpid);
+ int nextPageId = BufferedFileHandle.getPageId(dpid);
+ int freedSpace = 0;
+ context.open(fileId);
+ try {
+ while (nextPageId >= 0) {
+ if (context.stopSweeping()) {
+ // Exit as the index is being dropped
+ return 0L;
+ }
+ CloudCachedPage page0 =
context.pin(BufferedFileHandle.getDiskPageId(fileId, nextPageId), bcOpCtx);
+ boolean columnsLocked = false;
+ try {
+ leafFrame.setPage(page0);
+ nextPageId = leafFrame.getNextLeaf();
+ columnsLocked = page0.trySweepLock(lockedColumns);
+ if (columnsLocked) {
+ leafFrame.setPage(page0);
+ ranges.reset(leafFrame, plan);
+ freedSpace += punchHoles(context, leafFrame);
+ }
+ } finally {
+ if (columnsLocked) {
+ page0.sweepUnlock();
+ }
+ context.unpin(page0, bcOpCtx);
+ }
+ }
+ } finally {
+ context.close();
+ }
+
+ return freedSpace;
+ }
+
+ private long getFirstPageId(ILSMDiskComponent diskComponent) {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ int fileId = columnBTree.getFileId();
+ int firstPage = columnBTree.getBulkloadLeafStart();
+ return BufferedFileHandle.getDiskPageId(fileId, firstPage);
+ }
+
+ private int punchHoles(SweepContext context, ColumnBTreeReadLeafFrame
leafFrame) throws HyracksDataException {
+ int freedSpace = 0;
+ int numberOfPages = leafFrame.getMegaLeafNodeNumberOfPages();
+ int pageZeroId = leafFrame.getPageId();
+
+ // Start from 1 as we do not evict pageZero
+ BitSet nonEvictablePages = ranges.getNonEvictablePages();
+ int start = nonEvictablePages.nextClearBit(1);
+ while (start < numberOfPages) {
+ int end = nonEvictablePages.nextSetBit(start);
+ int numberOfEvictablePages = end - start;
+ freedSpace += context.punchHole(pageZeroId + start,
numberOfEvictablePages);
+
+ start = nonEvictablePages.nextClearBit(end);
+ }
+
+ return freedSpace;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
new file mode 100644
index 0000000000..e51f1d7493
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweeperUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTree;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeLeafFrameFactory;
+import
org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.utils.ColumnUtil;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+public class ColumnSweeperUtil {
+ private ColumnSweeperUtil() {
+ }
+
+ public static final BitSet EMPTY = new BitSet();
+
+ static IColumnProjectionInfo
createColumnProjectionInfo(List<ILSMDiskComponent> diskComponents,
+ IColumnTupleProjector projector) throws HyracksDataException {
+ // Get the newest disk component, which has the newest column metadata
+ ILSMDiskComponent newestComponent = diskComponents.get(0);
+ IValueReference columnMetadata =
ColumnUtil.getColumnMetadataCopy(newestComponent.getMetadata());
+
+ return projector.createProjectionInfo(columnMetadata);
+ }
+
+ static ColumnBTreeReadLeafFrame createLeafFrame(IColumnProjectionInfo
projectionInfo,
+ ILSMDiskComponent diskComponent) {
+ ColumnBTree columnBTree = (ColumnBTree) diskComponent.getIndex();
+ ColumnBTreeLeafFrameFactory leafFrameFactory =
(ColumnBTreeLeafFrameFactory) columnBTree.getLeafFrameFactory();
+ return leafFrameFactory.createReadFrame(projectionInfo);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
new file mode 100644
index 0000000000..f36a51d149
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ISweepClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+@FunctionalInterface
+public interface ISweepClock {
+ long getCurrentTime();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
new file mode 100644
index 0000000000..7b51d55520
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/SweepBufferCacheReadContext.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadContext.readAndPersistPage;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.control.nc.io.IOManager;
+import org.apache.hyracks.storage.common.buffercache.BufferCacheHeaderHelper;
+import org.apache.hyracks.storage.common.buffercache.CachedPage;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+import
org.apache.hyracks.storage.common.buffercache.context.IBufferCacheReadContext;
+import org.apache.hyracks.storage.common.file.BufferedFileHandle;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+final class SweepBufferCacheReadContext implements IBufferCacheReadContext {
+ static final IBufferCacheReadContext INSTANCE = new
SweepBufferCacheReadContext();
+
+ private SweepBufferCacheReadContext() {
+ }
+
+ @Override
+ public void onPin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public void onUnpin(ICachedPage page) {
+ // NoOp
+ }
+
+ @Override
+ public boolean isNewPage() {
+ return false;
+ }
+
+ @Override
+ public boolean incrementStats() {
+ // Do not increment the stats for the sweeper
+ return false;
+ }
+
+ @Override
+ public ByteBuffer processHeader(IOManager ioManager, BufferedFileHandle
fileHandle, BufferCacheHeaderHelper header,
+ CachedPage cPage) throws HyracksDataException {
+ // Will not persist as the disk is pressured
+ return readAndPersistPage(ioManager, fileHandle, header, cPage, false);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
new file mode 100644
index 0000000000..6bace98fef
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/sweep/ColumnSweepPlannerTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep;
+
+import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static org.apache.hyracks.util.StorageUtil.getIntSizeInBytes;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Random;
+
+import
org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummyColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.dummy.DummySweepClock;
+import org.apache.hyracks.util.StorageUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+
+public class ColumnSweepPlannerTest {
+ private static final int MAX_MEGA_LEAF_NODE_SIZE = getIntSizeInBytes(10,
StorageUtil.StorageUnit.MEGABYTE);
+ private static final Random RANDOM = new Random(0);
+ private final DummySweepClock clock = new DummySweepClock();
+
+ @Test
+ public void test10Columns() {
+ int numberOfPrimaryKeys = 1;
+ int numberOfColumns = numberOfPrimaryKeys + 10;
+ int[] columnSizes = createNormalColumnSizes(numberOfPrimaryKeys,
numberOfColumns);
+ ColumnSweepPlanner planner = new
ColumnSweepPlanner(numberOfPrimaryKeys, clock);
+ IntList projectedColumns = new IntArrayList();
+ DummyColumnProjectionInfo info = new
DummyColumnProjectionInfo(numberOfPrimaryKeys, QUERY, projectedColumns);
+
+ // Adjust sizes
+ planner.adjustColumnSizes(columnSizes, numberOfColumns);
+
+ // Project 3 columns
+ projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3,
projectedColumns);
+ // access the projected columns (max 10 times)
+ access(planner, info, true, 10);
+
+ // Advance clock
+ clock.advance(10);
+
+ // Plan for eviction
+ BitSet keptColumns = new BitSet();
+ planner.plan();
+ computeKeptColumns(planner.getPlanCopy(), keptColumns,
numberOfColumns);
+
+ // Project another 3 columns
+ projectedColumns(numberOfPrimaryKeys, numberOfColumns, 3,
projectedColumns);
+ // access the projected columns
+ access(planner, info, true, 100);
+
+ // At this point, the plan should change
+ BitSet newKeptColumns = new BitSet();
+ computeKeptColumns(planner.getPlanCopy(), newKeptColumns,
numberOfColumns);
+
+ Assert.assertNotEquals(keptColumns, newKeptColumns);
+ }
+
+ private void computeKeptColumns(BitSet plan, BitSet keptColumns, int
numberOfColumns) {
+ keptColumns.clear();
+ for (int i = 0; i < numberOfColumns; i++) {
+ if (!plan.get(i)) {
+ keptColumns.set(i);
+ }
+ }
+
+ System.out.println("Kept columns: " + keptColumns);
+ }
+
+ private void access(ColumnSweepPlanner planner, DummyColumnProjectionInfo
info, boolean hasSpace, int bound) {
+ int numberOfAccesses = RANDOM.nextInt(1, bound);
+ for (int i = 0; i < numberOfAccesses; i++) {
+ planner.access(info, hasSpace);
+ clock.advance(1);
+ }
+
+ System.out.println("Accessed: " + info + " " + numberOfAccesses + "
time");
+ }
+
+ private void projectedColumns(int numberPrimaryKeys, int numberOfColumns,
int numberOfProjectedColumns,
+ IntList projectedColumns) {
+ IntSet alreadyProjectedColumns = new IntOpenHashSet();
+ projectedColumns.clear();
+ for (int i = 0; i < numberOfProjectedColumns; i++) {
+ int columnIndex = RANDOM.nextInt(numberPrimaryKeys,
numberOfColumns);
+ while (alreadyProjectedColumns.contains(columnIndex)) {
+ columnIndex = RANDOM.nextInt(numberPrimaryKeys,
numberOfColumns);
+ }
+ projectedColumns.add(columnIndex);
+ alreadyProjectedColumns.add(columnIndex);
+ }
+ }
+
+ private int[] createNormalColumnSizes(int numberOfPrimaryKeys, int
numberOfColumns) {
+ int[] columnSizes = new int[numberOfColumns];
+ double[] normalDistribution = new double[numberOfColumns];
+ double sum = 0.0d;
+ for (int i = 0; i < numberOfColumns; i++) {
+ double value = Math.abs(RANDOM.nextGaussian());
+ normalDistribution[i] = value;
+ sum += value;
+ }
+
+ for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+ int size = (int) Math.round((normalDistribution[i] / sum) *
MAX_MEGA_LEAF_NODE_SIZE);
+ columnSizes[i] = size;
+ }
+
+ System.out.println("Column sizes:");
+ for (int i = 0; i < numberOfColumns; i++) {
+ System.out.println(i + ": " +
StorageUtil.toHumanReadableSize(columnSizes[i]));
+ }
+ System.out.println("TotalSize:" +
StorageUtil.toHumanReadableSize(Arrays.stream(columnSizes).sum()));
+ return columnSizes;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
new file mode 100644
index 0000000000..3b9d6ba68e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummyColumnProjectionInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
+import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+import it.unimi.dsi.fastutil.ints.IntList;
+
+public class DummyColumnProjectionInfo implements IColumnProjectionInfo {
+ private final int numberOfPrimaryKeys;
+ private final ColumnProjectorType projectorType;
+ private final IntList projectedColumns;
+
+ public DummyColumnProjectionInfo(int numberOfPrimaryKeys,
ColumnProjectorType projectorType,
+ IntList projectedColumns) {
+ this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+ this.projectorType = projectorType;
+ this.projectedColumns = projectedColumns;
+ }
+
+ @Override
+ public int getColumnIndex(int ordinal) {
+ return projectedColumns.getInt(ordinal);
+ }
+
+ @Override
+ public int getNumberOfProjectedColumns() {
+ return projectedColumns.size();
+ }
+
+ @Override
+ public int getNumberOfPrimaryKeys() {
+ return numberOfPrimaryKeys;
+ }
+
+ @Override
+ public int getFilteredColumnIndex(int ordinal) {
+ return -1;
+ }
+
+ @Override
+ public int getNumberOfFilteredColumns() {
+ return 0;
+ }
+
+ @Override
+ public ColumnProjectorType getProjectorType() {
+ return projectorType;
+ }
+
+ @Override
+ public String toString() {
+ return projectedColumns.toString();
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
new file mode 100644
index 0000000000..6f209ecbde
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/dummy/DummySweepClock.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.dummy;
+
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.sweep.ISweepClock;
+
+public class DummySweepClock implements ISweepClock {
+ long timestamp;
+
+ @Override
+ public long getCurrentTime() {
+ return timestamp;
+ }
+
+ public void advance(long delta) {
+ timestamp += delta;
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
index 1779527fb5..98d2d73321 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/LSMComponentIdUtils.java
@@ -55,15 +55,17 @@ public class LSMComponentIdUtils {
}
public static void persist(ILSMComponentId id, IComponentMetadata
metadata) throws HyracksDataException {
- LSMComponentId componentId = (LSMComponentId) id;
- metadata.put(COMPONENT_ID_MIN_KEY,
LongPointable.FACTORY.createPointable(componentId.getMinId()));
- metadata.put(COMPONENT_ID_MAX_KEY,
LongPointable.FACTORY.createPointable(componentId.getMaxId()));
+ metadata.put(COMPONENT_ID_MIN_KEY,
LongPointable.FACTORY.createPointable(id.getMinId()));
+ metadata.put(COMPONENT_ID_MAX_KEY,
LongPointable.FACTORY.createPointable(id.getMaxId()));
}
public static ILSMComponentId union(ILSMComponentId id1, ILSMComponentId
id2) {
- long minId = Long.min(((LSMComponentId) id1).getMinId(),
((LSMComponentId) id2).getMinId());
- long maxId = Long.max(((LSMComponentId) id1).getMaxId(),
((LSMComponentId) id2).getMaxId());
+ long minId = Long.min(id1.getMinId(), id2.getMinId());
+ long maxId = Long.max(id1.getMaxId(), id2.getMaxId());
return new LSMComponentId(minId, maxId);
}
+ public static boolean isMergedComponent(ILSMComponentId id) {
+ return id.getMinId() < id.getMaxId();
+ }
}