This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 9cb0210675c Add TimeSlice Control for TableScanOperator
9cb0210675c is described below
commit 9cb0210675cc0dc0863b144fd0116511c31e9769
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 27 13:48:21 2024 +0800
Add TimeSlice Control for TableScanOperator
---
.../source/AbstractSeriesScanOperator.java | 10 +--
.../operator/source/AlignedSeriesScanOperator.java | 15 ++--
.../source/relational/TableScanOperator.java | 96 ++++------------------
3 files changed, 31 insertions(+), 90 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
index d8c07d75caf..c03764bd61c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesScanOperator.java
@@ -87,7 +87,7 @@ public abstract class AbstractSeriesScanOperator extends
AbstractDataSourceOpera
}
}
- private boolean readFileData() throws IOException {
+ protected boolean readFileData() throws IOException {
while (seriesScanUtil.hasNextFile()) {
if (readChunkData()) {
return true;
@@ -96,7 +96,7 @@ public abstract class AbstractSeriesScanOperator extends
AbstractDataSourceOpera
return false;
}
- private boolean readChunkData() throws IOException {
+ protected boolean readChunkData() throws IOException {
while (seriesScanUtil.hasNextChunk()) {
if (readPageData()) {
return true;
@@ -105,7 +105,7 @@ public abstract class AbstractSeriesScanOperator extends
AbstractDataSourceOpera
return false;
}
- private boolean readPageData() throws IOException {
+ protected boolean readPageData() throws IOException {
if (seriesScanUtil.hasNextPage()) {
TsBlock tsBlock = seriesScanUtil.nextPage();
if (!isEmpty(tsBlock)) {
@@ -116,11 +116,11 @@ public abstract class AbstractSeriesScanOperator extends
AbstractDataSourceOpera
return false;
}
- private boolean isEmpty(TsBlock tsBlock) {
+ protected boolean isEmpty(TsBlock tsBlock) {
return tsBlock == null || tsBlock.isEmpty();
}
- private void appendToBuilder(TsBlock tsBlock) {
+ protected void appendToBuilder(TsBlock tsBlock) {
int size = tsBlock.getPositionCount();
if (resultTsBlockBuilder.isEmpty() && size >=
resultTsBlockBuilder.getMaxTsBlockLineNumber()) {
retainedTsBlock = tsBlock;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 63605457aec..5ab2a0f0055 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -86,22 +86,27 @@ public class AlignedSeriesScanOperator extends
AbstractSeriesScanOperator {
@Override
protected void buildResult(TsBlock tsBlock) {
+ appendDataIntoBuilder(tsBlock, resultTsBlockBuilder);
+ }
+
+ public static void appendDataIntoBuilder(TsBlock tsBlock, TsBlockBuilder
builder) {
int size = tsBlock.getPositionCount();
- TimeColumnBuilder timeColumnBuilder =
resultTsBlockBuilder.getTimeColumnBuilder();
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
TimeColumn timeColumn = tsBlock.getTimeColumn();
for (int i = 0; i < size; i++) {
timeColumnBuilder.writeLong(timeColumn.getLong(i));
- resultTsBlockBuilder.declarePosition();
+ builder.declarePosition();
}
for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
columnIndex < columnSize;
columnIndex++) {
- appendOneColumn(columnIndex, tsBlock, size);
+ appendOneColumn(columnIndex, tsBlock, size, builder);
}
}
- private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) {
- ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(columnIndex);
+ private static void appendOneColumn(
+ int columnIndex, TsBlock tsBlock, int size, TsBlockBuilder builder) {
+ ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex);
Column column = tsBlock.getColumn(columnIndex);
if (column.mayHaveNull()) {
for (int i = 0; i < size; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index c998203d7b5..deded9e1bd6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -23,7 +23,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesScanOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
@@ -42,8 +41,6 @@ import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
import org.apache.tsfile.read.common.block.column.BinaryColumn;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
-import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -55,9 +52,10 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator.appendDataIntoBuilder;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
-public class TableScanOperator extends AbstractDataSourceOperator {
+public class TableScanOperator extends AbstractSeriesScanOperator {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TableScanOperator.class);
@@ -140,6 +138,8 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long start = System.nanoTime();
+ boolean currentDeviceNoMoreData = false;
+
// here use do-while to promise doing this at least once
do {
/*
@@ -148,6 +148,7 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
* 3. consume next file finally
*/
if (!readPageData() && !readChunkData() && !readFileData()) {
+ currentDeviceNoMoreData = true;
break;
}
@@ -156,7 +157,9 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
&& measurementDataBlock == null);
// current device' data is consumed up
- if (measurementDataBuilder.isEmpty() && measurementDataBlock == null) {
+ if (measurementDataBuilder.isEmpty()
+ && measurementDataBlock == null
+ && currentDeviceNoMoreData) {
currentDeviceIndex++;
prepareForNextDevice();
}
@@ -181,75 +184,19 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
return checkTsBlockSizeAndGetResult();
}
- private boolean readFileData() throws IOException {
- while (seriesScanUtil.hasNextFile()) {
- if (readChunkData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readChunkData() throws IOException {
- while (seriesScanUtil.hasNextChunk()) {
- if (readPageData()) {
- return true;
- }
- }
- return false;
- }
-
- private boolean readPageData() throws IOException {
- while (seriesScanUtil.hasNextPage()) {
- TsBlock tsBlock = seriesScanUtil.nextPage();
- if (!isEmpty(tsBlock)) {
- appendToBuilder(tsBlock);
- return true;
- }
- }
- return false;
- }
-
- private void appendToBuilder(TsBlock tsBlock) {
- int size = tsBlock.getPositionCount();
+ @Override
+ protected void appendToBuilder(TsBlock tsBlock) {
if (measurementDataBuilder.isEmpty()
&& tsBlock.getPositionCount() >=
measurementDataBuilder.getMaxTsBlockLineNumber()) {
measurementDataBlock = tsBlock;
return;
}
- TimeColumnBuilder timeColumnBuilder =
measurementDataBuilder.getTimeColumnBuilder();
- TimeColumn timeColumn = tsBlock.getTimeColumn();
- for (int i = 0; i < size; i++) {
- timeColumnBuilder.writeLong(timeColumn.getLong(i));
- measurementDataBuilder.declarePosition();
- }
- for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount();
- columnIndex < columnSize;
- columnIndex++) {
- appendOneColumn(columnIndex, tsBlock, size);
- }
- }
-
- private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) {
- ColumnBuilder columnBuilder =
measurementDataBuilder.getColumnBuilder(columnIndex);
- Column column = tsBlock.getColumn(columnIndex);
- if (column.mayHaveNull()) {
- for (int i = 0; i < size; i++) {
- if (column.isNull(i)) {
- columnBuilder.appendNull();
- } else {
- columnBuilder.write(column, i);
- }
- }
- } else {
- for (int i = 0; i < size; i++) {
- columnBuilder.write(column, i);
- }
- }
+ appendDataIntoBuilder(tsBlock, measurementDataBuilder);
}
- private boolean isEmpty(TsBlock tsBlock) {
- return tsBlock == null || tsBlock.isEmpty();
+ @Override
+ protected void buildResult(TsBlock tsBlock) {
+ throw new UnsupportedOperationException();
}
private void constructResultTsBlock() {
@@ -309,18 +256,7 @@ public class TableScanOperator extends
AbstractDataSourceOperator {
@Override
public long calculateMaxPeekMemory() {
return (1L + columnsIndexArray.length)
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
- * 3L;
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return maxReturnSize;
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return calculateMaxPeekMemoryWithCounter();
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override