This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/optimize_for_multiply_device_limit_query in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 59e093cac1626ccfa5475a755d65b32462d0f864 Author: Beyyes <[email protected]> AuthorDate: Sun Jan 28 17:24:37 2024 +0800 fix max_tsblock_line_num optimization --- .../operator/source/AlignedSeriesScanOperator.java | 19 +++++++-- .../db/queryengine/plan/analyze/TemplatedInfo.java | 1 + .../plan/planner/OperatorTreeGenerator.java | 12 +++++- .../plan/planner/TemplatedLogicalPlan.java | 47 ++++++++++------------ .../operator/AlignedSeriesScanOperatorTest.java | 15 ++++--- .../execution/operator/OperatorMemoryTest.java | 3 +- .../tsfile/read/common/block/TsBlockBuilder.java | 17 ++++++-- 7 files changed, 74 insertions(+), 40 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index 346c80139f9..840f682b559 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -24,9 +24,11 @@ import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; @@ -36,12 +38,11 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; -import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder.MAX_LINE_NUMBER; - public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { private final int valueColumnCount; private boolean finished = false; + private int maxTsBlockLineNum = -1; public AlignedSeriesScanOperator( OperatorContext context, @@ -50,7 +51,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { Ordering scanOrder, SeriesScanOptions seriesScanOptions, boolean queryAllSensors, - List<TSDataType> dataTypes) { + List<TSDataType> dataTypes, + int maxTsBlockLineNum) { this.sourceId = sourceId; this.operatorContext = context; this.seriesScanUtil = @@ -67,6 +69,7 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { maxReturnSize, (1L + valueColumnCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()); + this.maxTsBlockLineNum = maxTsBlockLineNum; } @Override @@ -169,7 +172,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { private void appendToBuilder(TsBlock tsBlock) { int size = tsBlock.getPositionCount(); - if (resultTsBlockBuilder.isEmpty() && tsBlock.getPositionCount() >= MAX_LINE_NUMBER) { + if (resultTsBlockBuilder.isEmpty() + && tsBlock.getPositionCount() >= resultTsBlockBuilder.getMaxTsBlockLineNumber()) { retainedTsBlock = tsBlock; return; } @@ -213,4 +217,11 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { // time + all value columns return seriesScanUtil.getTsDataTypeList(); } + + @Override + public void initQueryDataSource(QueryDataSource dataSource) { + seriesScanUtil.initQueryDataSource(dataSource); + resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes()); + resultTsBlockBuilder.setMaxTsBlockLineNumber(this.maxTsBlockLineNum); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java index 48da26c1f37..40e391349a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedInfo.java @@ -65,6 +65,7 @@ public class TemplatedInfo { private Map<String, IMeasurementSchema> schemaMap; // not serialize private Map<String, List<InputLocation>> layoutMap; + private int maxTsBlockLineNum = -1; public TemplatedInfo( List<String> measurementList, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index cbf8db730de..46b2bb72603 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -358,6 +358,15 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP context.getNextOperatorId(), node.getPlanNodeId(), AlignedSeriesScanOperator.class.getSimpleName()); + + int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + if (context.getTypeProvider().getTemplatedInfo() != null) { + maxTsBlockLineNum = + (int) + Math.min( + context.getTypeProvider().getTemplatedInfo().getLimitValue(), maxTsBlockLineNum); + } + AlignedSeriesScanOperator seriesScanOperator = new AlignedSeriesScanOperator( operatorContext, @@ -368,7 +377,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.isQueryAllSensors(), context.getTypeProvider().getTemplatedInfo() != null ? context.getTypeProvider().getTemplatedInfo().getDataTypes() - : null); + : null, + maxTsBlockLineNum); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index 2819b592ad8..eac384da629 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -89,36 +89,33 @@ public class TemplatedLogicalPlan { } private void initCommonVariables() { - if (whereExpression != null) { - - if (!analysis.isTemplateWildCardQuery()) { - newMeasurementList = new ArrayList<>(measurementList); - newSchemaList = new ArrayList<>(schemaList); - Set<String> selectExpressions = new HashSet<>(measurementList); - List<Expression> whereSourceExpressions = searchSourceExpressions(whereExpression); - for (Expression expression : whereSourceExpressions) { - if (expression instanceof TimeSeriesOperand) { - String measurement = ((TimeSeriesOperand) expression).getPath().getMeasurement(); - if (!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) { - continue; - } - if (!selectExpressions.contains(measurement)) { - selectExpressions.add(measurement); - newMeasurementList.add(measurement); - newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement)); - } + if (!analysis.isTemplateWildCardQuery()) { + newMeasurementList = new ArrayList<>(measurementList); + newSchemaList = new ArrayList<>(schemaList); + Set<String> selectExpressions = new HashSet<>(measurementList); + List<Expression> whereSourceExpressions = searchSourceExpressions(whereExpression); + for (Expression expression : whereSourceExpressions) { + if (expression instanceof TimeSeriesOperand) { + String measurement = ((TimeSeriesOperand) expression).getPath().getMeasurement(); + if (!analysis.getDeviceTemplate().getSchemaMap().containsKey(measurement)) { + continue; + } + if (!selectExpressions.contains(measurement)) { + selectExpressions.add(measurement); + newMeasurementList.add(measurement); + newSchemaList.add(analysis.getDeviceTemplate().getSchema(measurement)); } } } + } - filterLayoutMap = makeLayout(newMeasurementList); + filterLayoutMap = makeLayout(newMeasurementList); - analysis - .getExpressionTypes() - .forEach( - (key, value) -> - context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value)); - } + analysis + .getExpressionTypes() + .forEach( + (key, value) -> + context.getTypeProvider().setType(key.getNode().getOutputSymbol(), value)); context .getTypeProvider() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java index 360ab9e2d0c..b5c61944205 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java @@ -129,7 +129,8 @@ public class AlignedSeriesScanOperatorTest { Ordering.ASC, getDefaultSeriesScanOptions(alignedPath), false, - null); + null, + -1); seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator .getOperatorContext() @@ -224,7 +225,8 @@ public class AlignedSeriesScanOperatorTest { Ordering.ASC, getDefaultSeriesScanOptions(alignedPath1), false, - null); + null, + -1); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -247,7 +249,8 @@ public class AlignedSeriesScanOperatorTest { Ordering.ASC, getDefaultSeriesScanOptions(alignedPath2), false, - null); + null, + -1); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() @@ -518,7 +521,8 @@ public class AlignedSeriesScanOperatorTest { Ordering.DESC, getDefaultSeriesScanOptions(alignedPath1), false, - null); + null, + -1); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -541,7 +545,8 @@ public class AlignedSeriesScanOperatorTest { Ordering.DESC, getDefaultSeriesScanOptions(alignedPath2), false, - null); + null, + -1); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index fd5eb9b8026..0950f85e021 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -196,7 +196,8 @@ public class OperatorMemoryTest { Ordering.ASC, SeriesScanOptions.getDefaultSeriesScanOptions(alignedPath), false, - null); + null, + -1); long maxPeekMemory = Math.max( diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java index 762fc47f4ed..890f9aeee0c 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlockBuilder.java @@ -48,9 +48,6 @@ public class TsBlockBuilder { // This could be any other small number. private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8; - public static final int MAX_LINE_NUMBER = - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); - private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); @@ -59,6 +56,8 @@ public class TsBlockBuilder { private List<TSDataType> types; private TsBlockBuilderStatus tsBlockBuilderStatus; private int declaredPositions; + private int maxTsBlockLineNumber = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); private TsBlockBuilder() {} @@ -265,7 +264,7 @@ public class TsBlockBuilder { } public boolean isFull() { - return declaredPositions >= MAX_LINE_NUMBER || tsBlockBuilderStatus.isFull(); + return declaredPositions >= maxTsBlockLineNumber || tsBlockBuilderStatus.isFull(); } public boolean isEmpty() { @@ -276,6 +275,16 @@ public class TsBlockBuilder { return declaredPositions; } + public int getMaxTsBlockLineNumber() { + return this.maxTsBlockLineNumber; + } + + public void setMaxTsBlockLineNumber(int maxTsBlockLineNumber) { + if (maxTsBlockLineNumber > 0) { + this.maxTsBlockLineNumber = maxTsBlockLineNumber; + } + } + public long getSizeInBytes() { return tsBlockBuilderStatus.getSizeInBytes(); }
