This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ScheduleOpt in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a6f0abd09c564f31ef426d0d95c3b547c2cf32d1 Author: JackieTien97 <[email protected]> AuthorDate: Wed Nov 22 16:25:24 2023 +0800 Accelerate the process of PlanNode to Operator --- .../source/AbstractDataSourceOperator.java | 10 +++++++++ .../AbstractSeriesAggregationScanOperator.java | 19 ++++++++-------- .../operator/source/AlignedSeriesScanOperator.java | 26 ++++++++++++---------- .../operator/source/SeriesScanOperator.java | 26 +++++++++++++--------- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java index d879927b5e0..41fc28f4a9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractDataSourceOperator.java @@ -20,13 +20,23 @@ package org.apache.iotdb.db.queryengine.execution.operator.source; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; + +import java.util.List; public abstract class AbstractDataSourceOperator extends AbstractSourceOperator implements DataSourceOperator { protected SeriesScanUtil seriesScanUtil; + // Using for building result tsBlock + protected TsBlockBuilder resultTsBlockBuilder; + @Override public void initQueryDataSource(QueryDataSource dataSource) { seriesScanUtil.initQueryDataSource(dataSource); + resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); } + + protected abstract List<TSDataType> getResultDataTypes(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java index b3fbce48248..d353bd9e9f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java @@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.utils.Pair; import java.io.IOException; @@ -59,9 +58,6 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData // But in facing of statistics, it will invoke another method processStatistics() protected final List<Aggregator> aggregators; - // Using for building result tsBlock - protected final TsBlockBuilder resultTsBlockBuilder; - protected boolean finished = false; private final long cachedRawDataSize; @@ -89,12 +85,6 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData this.aggregators = aggregators; this.timeRangeIterator = timeRangeIterator; - List<TSDataType> dataTypes = new ArrayList<>(); - for (Aggregator aggregator : aggregators) { - dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); - } - this.resultTsBlockBuilder = new TsBlockBuilder(dataTypes); - this.cachedRawDataSize = (1L + subSensorSize) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); this.maxReturnSize = maxReturnSize; @@ -389,4 +379,13 @@ public abstract class AbstractSeriesAggregationScanOperator extends AbstractData && currentPageStatistics.containedByTimeFilter(seriesScanUtil.getGlobalTimeFilter()) && !seriesScanUtil.currentPageModified(); } + + @Override + protected List<TSDataType> getResultDataTypes() { + List<TSDataType> dataTypes = new ArrayList<>(); + for (Aggregator aggregator : aggregators) { + dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); + } + return dataTypes; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index 47357a7d39e..344b194a557 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; @@ -41,7 +40,6 @@ import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder.MAX_LINE_ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { - private final TsBlockBuilder builder; private final int valueColumnCount; private boolean finished = false; @@ -63,8 +61,6 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { context.getInstanceContext(), queryAllSensors, dataTypes); - // time + all value columns - this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); this.valueColumnCount = seriesPath.getColumnNum(); this.maxReturnSize = Math.min( @@ -78,8 +74,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { if (retainedTsBlock != null) { return getResultFromRetainedTsBlock(); } - resultTsBlock = builder.build(); - builder.reset(); + resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); return checkTsBlockSizeAndGetResult(); } @@ -107,10 +103,10 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { } } while (System.nanoTime() - start < maxRuntime - && !builder.isFull() + && !resultTsBlockBuilder.isFull() && retainedTsBlock == null); - finished = (builder.isEmpty() && retainedTsBlock == null); + finished = (resultTsBlockBuilder.isEmpty() && retainedTsBlock == null); return !finished; } catch (IOException e) { @@ -171,15 +167,15 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { private void appendToBuilder(TsBlock tsBlock) { int size = tsBlock.getPositionCount(); - if (builder.isEmpty() && tsBlock.getPositionCount() >= MAX_LINE_NUMBER) { + if (resultTsBlockBuilder.isEmpty() && tsBlock.getPositionCount() >= MAX_LINE_NUMBER) { retainedTsBlock = tsBlock; return; } - TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); TimeColumn timeColumn = tsBlock.getTimeColumn(); for (int i = 0; i < size; i++) { timeColumnBuilder.writeLong(timeColumn.getLong(i)); - builder.declarePosition(); + resultTsBlockBuilder.declarePosition(); } for (int columnIndex = 0, columnSize = tsBlock.getValueColumnCount(); columnIndex < columnSize; @@ -189,7 +185,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { } private void appendOneColumn(int columnIndex, TsBlock tsBlock, int size) { - ColumnBuilder columnBuilder = builder.getColumnBuilder(columnIndex); + ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(columnIndex); Column column = tsBlock.getColumn(columnIndex); if (column.mayHaveNull()) { for (int i = 0; i < size; i++) { @@ -209,4 +205,10 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { private boolean isEmpty(TsBlock tsBlock) { return tsBlock == null || tsBlock.isEmpty(); } + + @Override + protected List<TSDataType> getResultDataTypes() { + // time + all value columns + return seriesScanUtil.getTsDataTypeList(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java index 7fd6d7291e2..acdbdf5ad6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java @@ -25,19 +25,19 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; public class SeriesScanOperator extends AbstractDataSourceOperator { - private final TsBlockBuilder builder; private boolean finished = false; public SeriesScanOperator( @@ -52,7 +52,6 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { new SeriesScanUtil(seriesPath, scanOrder, seriesScanOptions, context.getInstanceContext()); this.maxReturnSize = Math.min(maxReturnSize, TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); - this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); } @Override @@ -60,8 +59,8 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { if (retainedTsBlock != null) { return getResultFromRetainedTsBlock(); } - resultTsBlock = builder.build(); - builder.reset(); + resultTsBlock = resultTsBlockBuilder.build(); + resultTsBlockBuilder.reset(); return checkTsBlockSizeAndGetResult(); } @@ -87,9 +86,9 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { if (!readPageData() && !readChunkData() && !readFileData()) { break; } - } while (System.nanoTime() - start < maxRuntime && !builder.isFull()); + } while (System.nanoTime() - start < maxRuntime && !resultTsBlockBuilder.isFull()); - finished = builder.isEmpty(); + finished = resultTsBlockBuilder.isEmpty(); return !finished; } catch (IOException e) { @@ -148,9 +147,9 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } private void appendToBuilder(TsBlock tsBlock) { - TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder(); + TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); TimeColumn timeColumn = tsBlock.getTimeColumn(); - ColumnBuilder columnBuilder = builder.getColumnBuilder(0); + ColumnBuilder columnBuilder = resultTsBlockBuilder.getColumnBuilder(0); Column column = tsBlock.getColumn(0); if (column.mayHaveNull()) { @@ -161,13 +160,13 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { } else { columnBuilder.write(column, i); } - builder.declarePosition(); + resultTsBlockBuilder.declarePosition(); } } else { for (int i = 0, size = tsBlock.getPositionCount(); i < size; i++) { timeColumnBuilder.writeLong(timeColumn.getLong(i)); columnBuilder.write(column, i); - builder.declarePosition(); + resultTsBlockBuilder.declarePosition(); } } } @@ -175,4 +174,9 @@ public class SeriesScanOperator extends AbstractDataSourceOperator { private boolean isEmpty(TsBlock tsBlock) { return tsBlock == null || tsBlock.isEmpty(); } + + @Override + protected List<TSDataType> getResultDataTypes() { + return seriesScanUtil.getTsDataTypeList(); + } }
