This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/multi_devices_fe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5d917e3cafc01ce2471f3299b7f0bbadef8363a0 Author: Beyyes <[email protected]> AuthorDate: Sun Nov 19 16:38:09 2023 +0800 optimize the data types in align by device query + series scan operator --- .../operator/source/AlignedSeriesScanOperator.java | 8 ++++++-- .../operator/source/AlignedSeriesScanUtil.java | 13 +++++++++---- .../db/queryengine/plan/analyze/TemplatedAnalyze.java | 4 ++-- .../iotdb/db/queryengine/plan/analyze/TypeProvider.java | 17 ++++++++++++++++- .../db/queryengine/plan/execution/QueryExecution.java | 10 ++++++---- .../db/queryengine/plan/planner/LogicalPlanBuilder.java | 15 ++++++++++++--- .../queryengine/plan/planner/OperatorTreeGenerator.java | 3 ++- .../queryengine/plan/planner/SubPlanTypeExtractor.java | 1 + .../queryengine/plan/planner/TemplatedLogicalPlan.java | 16 +++++++++++++--- .../plan/planner/TemplatedLogicalPlanBuilder.java | 6 ------ .../plan/planner/distribution/ExchangeNodeAdder.java | 3 --- .../operator/AlignedSeriesScanOperatorTest.java | 15 ++++++++++----- .../execution/operator/OperatorMemoryTest.java | 3 ++- 13 files changed, 79 insertions(+), 35 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java index c80bed6963d..ecf673de321 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import org.apache.iotdb.tsfile.read.common.block.column.Column; @@ -33,6 +34,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder.MAX_LINE_NUMBER; @@ -49,7 +51,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { AlignedPath seriesPath, Ordering scanOrder, SeriesScanOptions seriesScanOptions, - boolean queryAllSensors) { + boolean queryAllSensors, + List<TSDataType> dataTypes) { this.sourceId = sourceId; this.operatorContext = context; this.seriesScanUtil = @@ -58,7 +61,8 @@ public class AlignedSeriesScanOperator extends AbstractDataSourceOperator { scanOrder, seriesScanOptions, context.getInstanceContext(), - queryAllSensors); + queryAllSensors, + dataTypes); // time + all value columns this.builder = new TsBlockBuilder(seriesScanUtil.getTsDataTypeList()); this.valueColumnCount = seriesPath.getColumnNum(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java index 300412081f4..9adbd6ffd33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java @@ -60,7 +60,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { Ordering scanOrder, SeriesScanOptions scanOptions, FragmentInstanceContext context) { - this(seriesPath, scanOrder, scanOptions, context, false); + this(seriesPath, scanOrder, scanOptions, context, false, null); } public AlignedSeriesScanUtil( @@ -68,11 +68,16 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil { Ordering scanOrder, SeriesScanOptions scanOptions, FragmentInstanceContext context, - boolean queryAllSensors) { + boolean queryAllSensors, + List<TSDataType> givenDataTypes) { super(seriesPath, scanOrder, scanOptions, context); dataTypes = - ((AlignedPath) seriesPath) - .getSchemaList().stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); + givenDataTypes != null + ? givenDataTypes + : ((AlignedPath) seriesPath) + .getSchemaList().stream() + .map(IMeasurementSchema::getType) + .collect(Collectors.toList()); isAligned = true; this.queryAllSensors = queryAllSensors; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java index bd787c69f62..0f00718e50d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedAnalyze.java @@ -236,8 +236,8 @@ public class TemplatedAnalyze { normalizeExpression(analyzeWhereSplitByDevice(queryStatement, devicePath, schemaTree)); } catch (MeasurementNotExistException e) { logger.warn( - "Meets MeasurementNotExistException in analyzeDeviceToWhere when executing align by device, " - + "error msg: {}", + "Meets MeasurementNotExistException in analyzeDeviceToWhere " + + "when executing align by device, error msg: {}", e.getMessage()); deviceIterator.remove(); continue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java index de56dbfdcb0..feddf18226e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java @@ -35,9 +35,12 @@ public class TypeProvider { private final Map<String, TSDataType> typeMap; - // measurementList, schemaList is not null only when all devices in one template + ///////////////////////////////////////////////////////////////////////////////////////////////// + // All Queries Devices Set In One Template + ///////////////////////////////////////////////////////////////////////////////////////////////// private List<String> measurementList; private List<IMeasurementSchema> schemaList; + private List<TSDataType> dataTypes; public TypeProvider() { this.typeMap = new HashMap<>(); @@ -103,6 +106,10 @@ public class TypeProvider { return Objects.hash(typeMap); } + ///////////////////////////////////////////////////////////////////////////////////////////////// + // All Queries Devices Set In One Template + ///////////////////////////////////////////////////////////////////////////////////////////////// + public void setMeasurementList(List<String> measurementList) { this.measurementList = measurementList; } @@ -118,4 +125,12 @@ public class TypeProvider { public List<IMeasurementSchema> getSchemaList() { return this.schemaList; } + + public void setDataTypes(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public List<TSDataType> getDataTypes() { + return this.dataTypes; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 3b02fea3b7b..0e4ca231b3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -217,15 +217,16 @@ public class QueryExecution implements IQueryExecution { return; } - long sTime = System.currentTimeMillis(); + long lastRecordTime = System.currentTimeMillis(); // check timeout for query first checkTimeOutForQuery(); doLogicalPlan(); - logger.warn("--- [doLogicalPlan] : {}ms", System.currentTimeMillis() - sTime); - sTime = System.currentTimeMillis(); + logger.warn("--- [doLogicalPlan] : {}ms", System.currentTimeMillis() - lastRecordTime); + lastRecordTime = System.currentTimeMillis(); doDistributedPlan(); - logger.warn("--- [doDistributedPlan] : {}ms", System.currentTimeMillis() - sTime); + logger.warn("--- [doDistributedPlan] : {}ms", System.currentTimeMillis() - lastRecordTime); + lastRecordTime = System.currentTimeMillis(); // update timeout after finishing plan stage context.setTimeOut( @@ -237,6 +238,7 @@ public class QueryExecution implements IQueryExecution { } PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime); schedule(); + logger.warn("--- [schedule] : {}ms", System.currentTimeMillis() - lastRecordTime); // set partial insert error message // When some columns in one insert failed, other column will continue executing insertion. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 5ebf6fe3c1b..b79d2037874 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -777,13 +777,16 @@ public class LogicalPlanBuilder { ? queryStatement.getRowOffset() + queryStatement.getRowLimit() : queryStatement.getRowLimit(); + // 1. LIMIT and LIMIT_VALUE is smaller than 1000000, + // 2. `order by based on time` or `order by based on expression`, + // 3. no aggregation, + // when satisfy all above requirements use ToKNode. if (!queryStatement.isAggregationQuery() && queryStatement.hasLimit() && queryStatement.getOrderByComponent() != null && !queryStatement.isOrderByBasedOnDevice() && limitValue <= LIMIT_USE_TOP_K_FOR_ALIGN_BY_DEVICE) { - // order by time and order by expression with limit, can be optimized to TopK implementation TopKNode topKNode = new TopKNode( context.getQueryId().genPlanNodeId(), @@ -794,7 +797,8 @@ public class LogicalPlanBuilder { // if value filter exists, need add a LIMIT-NODE as the child node of TopKNode long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1; - if ((queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression())) { + // order by based on time, use TopKNode + SingleDeviceViewNode + if (queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression()) { addSingleDeviceViewNodes( topKNode, deviceNameToSourceNodesMap, @@ -802,6 +806,7 @@ public class LogicalPlanBuilder { deviceToMeasurementIndexesMap, valueFilterLimit); } else { + // order by based on expression, use TopKNode + DeviceViewNode topKNode.addChild( addDeviceViewNode( orderByParameter, @@ -813,9 +818,12 @@ public class LogicalPlanBuilder { this.root = topKNode; } - // order by time + no limit, device can be optimized by SingleDeviceViewNode and MergeSortNode + // 1. `order by based on time` + `no order by expression`, + // 2. no LIMIT or LIMIT_VALUE is larger than 1000000, + // when satisfy all above requirements use MergeSortNode. else if (queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression()) { if (deviceNameToSourceNodesMap.size() == 1) { + // only one device, use DeviceViewNode, no need MergeSortNode this.root = addDeviceViewNode( orderByParameter, @@ -824,6 +832,7 @@ public class LogicalPlanBuilder { deviceNameToSourceNodesMap, -1); } else { + // otherwise use MergeSortNode + SingleDeviceViewNode MergeSortNode mergeSortNode = new MergeSortNode( context.getQueryId().genPlanNodeId(), orderByParameter, outputColumnNames); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 55d3cc47f30..45b8ca02b3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -362,7 +362,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP seriesPath, node.getScanOrder(), seriesScanOptionsBuilder.build(), - node.isQueryAllSensors()); + node.isQueryAllSensors(), + context.getTypeProvider().getDataTypes()); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java index abda915cf66..8ad60acc063 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java @@ -48,6 +48,7 @@ public class SubPlanTypeExtractor { TypeProvider typeProvider = new TypeProvider(); typeProvider.setSchemaList(allTypes.getSchemaList()); typeProvider.setMeasurementList(allTypes.getMeasurementList()); + typeProvider.setDataTypes(allTypes.getDataTypes()); root.accept(new Visitor(typeProvider, allTypes), null); return typeProvider; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index 78e6d88c2d1..5e38a328aa0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.commons.path.MeasurementPath; @@ -35,6 +36,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchSourceExpressions; import static org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanVisitor.pushDownLimitToScanNode; @@ -55,9 +57,6 @@ public class TemplatedLogicalPlan { measurementList = new ArrayList<>(analysis.getMeasurementList()); schemaList = new ArrayList<>(analysis.getMeasurementSchemaList()); - - context.getTypeProvider().setMeasurementList(measurementList); - context.getTypeProvider().setSchemaList(schemaList); } public PlanNode visitQuery() { @@ -178,6 +177,17 @@ public class TemplatedLogicalPlan { queryStatement.getResultTimeOrder()); } + if (context.getTypeProvider().getMeasurementList() == null) { + context.getTypeProvider().setMeasurementList(mergedMeasurementList); + context.getTypeProvider().setSchemaList(mergedSchemaList); + context + .getTypeProvider() + .setDataTypes( + mergedSchemaList.stream() + .map(IMeasurementSchema::getType) + .collect(Collectors.toList())); + } + return planBuilder.getRoot(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java index a068f935164..423db389b4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java @@ -30,12 +30,9 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; -import org.apache.iotdb.tsfile.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; -import com.google.common.base.Function; - import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; @@ -51,8 +48,6 @@ public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder { private final List<String> measurementList; private final List<IMeasurementSchema> schemaList; - private final Function<Expression, TSDataType> getPreAnalyzedType; - public TemplatedLogicalPlanBuilder( Analysis analysis, MPPQueryContext context, @@ -63,7 +58,6 @@ public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder { this.context = context; this.measurementList = measurementList; this.schemaList = schemaList; - this.getPreAnalyzedType = analysis::getType; } public TemplatedLogicalPlanBuilder planRawDataSource( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index d22958a5a1f..79d9f6beb96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -381,9 +381,6 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { TopKNode rootNode = (TopKNode) node; Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>(); for (PlanNode child : visitedChildren) { - if (child instanceof SingleDeviceViewNode) { - ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true); - } TRegionReplicaSet region = context.getNodeDistribution(child.getPlanNodeId()).region; regionTopKNodeMap .computeIfAbsent( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java index 4ab3237998f..4965a212ac5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AlignedSeriesScanOperatorTest.java @@ -128,7 +128,8 @@ public class AlignedSeriesScanOperatorTest { alignedPath, Ordering.ASC, getDefaultSeriesScanOptions(alignedPath), - false); + false, + null); seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator .getOperatorContext() @@ -222,7 +223,8 @@ public class AlignedSeriesScanOperatorTest { alignedPath1, Ordering.ASC, getDefaultSeriesScanOptions(alignedPath1), - false); + false, + null); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -244,7 +246,8 @@ public class AlignedSeriesScanOperatorTest { alignedPath2, Ordering.ASC, getDefaultSeriesScanOptions(alignedPath2), - false); + false, + null); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() @@ -514,7 +517,8 @@ public class AlignedSeriesScanOperatorTest { alignedPath1, Ordering.DESC, getDefaultSeriesScanOptions(alignedPath1), - false); + false, + null); seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator1 .getOperatorContext() @@ -536,7 +540,8 @@ public class AlignedSeriesScanOperatorTest { alignedPath2, Ordering.DESC, getDefaultSeriesScanOptions(alignedPath2), - false); + false, + null); seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources)); seriesScanOperator2 .getOperatorContext() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java index edb2de5272c..4ee62b7dbd4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java @@ -193,7 +193,8 @@ public class OperatorMemoryTest { alignedPath, Ordering.ASC, SeriesScanOptions.getDefaultSeriesScanOptions(alignedPath), - false); + false, + null); long maxPeekMemory = Math.max(
