This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by
this push:
new 00415c95717 add template
00415c95717 is described below
commit 00415c95717042261ba7ac41b70cc42966c8b037
Author: Beyyes <[email protected]>
AuthorDate: Thu Oct 26 23:49:24 2023 +0800
add template
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 62 ++++++--
.../plan/analyze/TemplatedDeviceAnalyze.java | 164 +++++++++++++++++++++
.../queryengine/plan/execution/QueryExecution.java | 9 +-
3 files changed, 217 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index fc8f341d8d0..d40cd576319 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -211,10 +211,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private static final Expression DEVICE_EXPRESSION =
+ protected static final Expression DEVICE_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(DEVICE,
TSDataType.TEXT);
- private static final Expression END_TIME_EXPRESSION =
+ protected static final Expression END_TIME_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME,
TSDataType.INT64);
private final List<String> lastQueryColumnNames =
@@ -251,6 +251,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitQuery(QueryStatement queryStatement, MPPQueryContext
context) {
Analysis analysis = new Analysis();
analysis.setLastLevelUseWildcard(queryStatement.isLastLevelUseWildcard());
+
+ long startTime = System.currentTimeMillis();
try {
// check for semantic errors
queryStatement.semanticCheck();
@@ -260,6 +262,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
ISchemaTree schemaTree = analyzeSchema(queryStatement, analysis,
context);
+
+ logger.warn("----- Analyze analyzeSchema cost: {}ms",
System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
+
// If there is no leaf node in the schema tree, the query should be
completed immediately
if (schemaTree.isEmpty()) {
return finishQuery(queryStatement, analysis);
@@ -279,6 +285,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// fe 线程管理
//
List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
+ logger.warn("----- Analyze analyzeFrom cost: {}ms",
System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
// remove the device which won't appear in resultSet after
limit/offset
@@ -286,7 +294,18 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
- outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
+ logger.warn(
+ "----- Analyze analyzeDeviceToWhere cost: {}ms",
+ System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
+
+ outputExpressions =
+ TemplatedDeviceAnalyze.analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
+
+ logger.warn(
+ "----- Analyze analyzeSelect cost: {}ms",
System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
+
if (deviceList.isEmpty()) {
return finishQuery(queryStatement, analysis);
}
@@ -299,10 +318,18 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceToAggregation(analysis, queryStatement);
analyzeDeviceToSourceTransform(analysis, queryStatement);
analyzeDeviceToSource(analysis, queryStatement);
+ logger.warn(
+ "----- Analyze analyzeDeviceToSource cost: {}ms",
+ System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
analyzeDeviceViewOutput(analysis, queryStatement);
analyzeDeviceViewInput(analysis, queryStatement);
+ logger.warn(
+ "----- Analyze analyzeDeviceView cost: {}ms",
System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
+
analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
} else {
Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
@@ -352,7 +379,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// fetch partition information
analyzeDataPartition(analysis, queryStatement, schemaTree);
-
+ logger.warn(
+ "----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
+ System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
} catch (StatementAnalyzeException e) {
throw new StatementAnalyzeException(
"Meet error when analyzing the query statement: " + e.getMessage());
@@ -633,12 +663,14 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Set<PartialPath> deviceSet = new HashSet<>();
for (PartialPath devicePattern : devicePatternList) {
// get all matched devices
+ // TODO isPrefixMatch可否设置为false? analyzeFrom能否直接返回schemaTree的全部devices?
deviceSet.addAll(
- schemaTree.getMatchedDevices(devicePattern).stream()
+ schemaTree.getMatchedDevices(devicePattern, false).stream()
.map(DeviceSchemaInfo::getDevicePath)
.collect(Collectors.toList()));
}
+ // TODO 是否一定要排序? 最终的sourceNodeList已经会排序?
return queryStatement.getResultDeviceOrder() == Ordering.ASC
? deviceSet.stream().sorted().collect(Collectors.toList())
:
deviceSet.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
@@ -649,7 +681,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceList) {
List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
ColumnPaginationController paginationController =
@@ -663,7 +695,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// use LinkedHashMap for order-preserving
Map<Expression, Map<String, Expression>>
measurementToDeviceSelectExpressions =
new LinkedHashMap<>();
- for (PartialPath device : deviceSet) {
+ for (PartialPath device : deviceList) {
List<Expression> selectExpressionsOfOneDevice =
concatDeviceAndBindSchemaForExpression(selectExpression, device,
schemaTree);
if (selectExpressionsOfOneDevice.isEmpty()) {
@@ -718,12 +750,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// remove devices without measurements to compute
Set<PartialPath> noMeasurementDevices = new HashSet<>();
- for (PartialPath device : deviceSet) {
+ for (PartialPath device : deviceList) {
if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
noMeasurementDevices.add(device);
}
}
- deviceSet.removeAll(noMeasurementDevices);
+ deviceList.removeAll(noMeasurementDevices);
// when the select expression of any device is empty,
// the where expression map also need remove this device
@@ -745,7 +777,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return outputExpressions;
}
- private void updateMeasurementToDeviceSelectExpressions(
+ protected static void updateMeasurementToDeviceSelectExpressions(
Analysis analysis,
Map<Expression, Map<String, Expression>>
measurementToDeviceSelectExpressions,
PartialPath device,
@@ -759,7 +791,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private void updateDeviceToSelectExpressions(
+ protected static void updateDeviceToSelectExpressions(
Analysis analysis,
Map<String, Set<Expression>> deviceToSelectExpressions,
Map<String, Expression> deviceToSelectExpressionsOfOneMeasurement) {
@@ -777,7 +809,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private String analyzeAlias(
+ protected static String analyzeAlias(
String resultColumnAlias,
Expression rawExpression,
Expression normalizedExpression,
@@ -1623,7 +1655,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
queryStatement.updateSortItems(orderByExpressions);
}
- private TSDataType analyzeExpressionType(Analysis analysis, Expression
expression) {
+ private static TSDataType analyzeExpressionType(Analysis analysis,
Expression expression) {
return analyzeExpression(analysis, expression);
}
@@ -2162,7 +2194,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
* <p>an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align
by device, return
* false while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
*/
- private void checkDataTypeConsistencyInAlignByDevice(
+ protected static void checkDataTypeConsistencyInAlignByDevice(
Analysis analysis, List<Expression> expressions) {
TSDataType checkedDataType = analysis.getType(expressions.get(0));
for (Expression expression : expressions) {
@@ -2183,7 +2215,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private void checkAliasUniqueness(
+ protected static void checkAliasUniqueness(
String alias, Map<Expression, Map<String, Expression>>
measurementToDeviceSelectExpressions) {
if (alias != null && measurementToDeviceSelectExpressions.keySet().size()
> 1) {
throw new SemanticException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
new file mode 100644
index 00000000000..888b65af453
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
+import org.apache.iotdb.db.schemaengine.template.Template;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeAlias;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.checkAliasUniqueness;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.updateDeviceToSelectExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.updateMeasurementToDeviceSelectExpressions;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.toLowerCaseExpression;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
+
+/**
+ * This class provides accelerated implementation for multiple devices align
by device query. This
+ * optimization is only used for devices with same template, using template
can avoid many
+ * unnecessary judgements.
+ *
+ * <p>e.g. for query `SELECT * FROM root.xx.** order by device/time/expression
align by device`, the
+ * device list of `root.xx.**` must use same template.
+ */
+public class TemplatedDeviceAnalyze {
+
+ protected static List<Pair<Expression, String>> analyzeSelect(
+ Analysis analysis,
+ QueryStatement queryStatement,
+ ISchemaTree schemaTree,
+ List<PartialPath> deviceList) {
+
+ Template template =
ClusterTemplateManager.getInstance().getAllTemplates().get(0);
+
+ List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
+ Map<String, Set<Expression>> deviceToSelectExpressions = new HashMap<>();
+ ColumnPaginationController paginationController =
+ new ColumnPaginationController(
+ queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset(),
false);
+
+ for (ResultColumn resultColumn :
queryStatement.getSelectComponent().getResultColumns()) {
+ Expression selectExpression = resultColumn.getExpression();
+
+ // select expression after removing wildcard, LinkedHashMap for
order-preserving
+ Map<Expression, Map<String, Expression>>
measurementToDeviceSelectExpressions =
+ new LinkedHashMap<>();
+ for (PartialPath device : deviceList) {
+ List<Expression> selectExpressionsOfOneDevice =
+ concatDeviceAndBindSchemaForExpression(selectExpression, device,
schemaTree);
+ if (selectExpressionsOfOneDevice.isEmpty()) {
+ continue;
+ }
+
+ updateMeasurementToDeviceSelectExpressions(
+ analysis, measurementToDeviceSelectExpressions, device,
selectExpressionsOfOneDevice);
+ }
+
+ checkAliasUniqueness(resultColumn.getAlias(),
measurementToDeviceSelectExpressions);
+
+ for (Map.Entry<Expression, Map<String, Expression>> entry :
+ measurementToDeviceSelectExpressions.entrySet()) {
+ Expression measurementExpression = entry.getKey();
+ Map<String, Expression> deviceToSelectExpressionsOfOneMeasurement =
entry.getValue();
+
+ if (paginationController.hasCurOffset()) {
+ paginationController.consumeOffset();
+ } else if (paginationController.hasCurLimit()) {
+ deviceToSelectExpressionsOfOneMeasurement
+ .values()
+ .forEach(expression -> analyzeExpression(analysis, expression));
+
+ // fix: devices used same template must have consistent type, no
need to
+ // checkDataTypeConsistency
+
+ Expression lowerCaseMeasurementExpression =
toLowerCaseExpression(measurementExpression);
+ analyzeExpression(analysis, lowerCaseMeasurementExpression);
+
+ outputExpressions.add(
+ new Pair<>(
+ lowerCaseMeasurementExpression,
+ analyzeAlias(
+ resultColumn.getAlias(),
+ measurementExpression,
+ lowerCaseMeasurementExpression,
+ queryStatement)));
+
+ updateDeviceToSelectExpressions(
+ analysis, deviceToSelectExpressions,
deviceToSelectExpressionsOfOneMeasurement);
+
+ paginationController.consumeLimit();
+ } else {
+ break;
+ }
+ }
+ }
+
+ removeDevicesWithoutMeasurements(deviceList, deviceToSelectExpressions,
analysis);
+
+ Set<Expression> selectExpressions = new LinkedHashSet<>();
+ selectExpressions.add(DEVICE_EXPRESSION);
+ if (queryStatement.isOutputEndTime()) {
+ selectExpressions.add(END_TIME_EXPRESSION);
+ }
+ outputExpressions.forEach(pair -> selectExpressions.add(pair.getLeft()));
+ analysis.setSelectExpressions(selectExpressions);
+ analysis.setDeviceToSelectExpressions(deviceToSelectExpressions);
+
+ return outputExpressions;
+ }
+
+ private static void removeDevicesWithoutMeasurements(
+ List<PartialPath> deviceList,
+ Map<String, Set<Expression>> deviceToSelectExpressions,
+ Analysis analysis) {
+ // remove devices without measurements to compute
+ Set<PartialPath> noMeasurementDevices = new HashSet<>();
+ for (PartialPath device : deviceList) {
+ if (!deviceToSelectExpressions.containsKey(device.getFullPath())) {
+ noMeasurementDevices.add(device);
+ }
+ }
+ deviceList.removeAll(noMeasurementDevices);
+
+ // when the select expression of any device is empty,
+ // the where expression map also need remove this device
+ if (analysis.getDeviceToWhereExpression() != null) {
+ noMeasurementDevices.forEach(
+ devicePath ->
analysis.getDeviceToWhereExpression().remove(devicePath.getFullPath()));
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 1fd4652a0bb..e9c68fd6464 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -217,10 +217,16 @@ public class QueryExecution implements IQueryExecution {
return;
}
+ long sTime = System.currentTimeMillis();
// check timeout for query first
checkTimeOutForQuery();
doLogicalPlan();
+ logger.warn("----- doLogicalPlan cost: {}ms", System.currentTimeMillis() -
sTime);
+ sTime = System.currentTimeMillis();
+
doDistributedPlan();
+ logger.warn("----- doDistributedPlan cost: {}ms",
System.currentTimeMillis() - sTime);
+
// update timeout after finishing plan stage
context.setTimeOut(
context.getTimeOut() - (System.currentTimeMillis() -
context.getStartTime()));
@@ -239,9 +245,6 @@ public class QueryExecution implements IQueryExecution {
if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
stateMachine.transitionToFailed(analysis.getFailStatus());
}
- logger.warn(
- "~~~~ Consume time in doLogicalPlan+doDistributionPlan: {}ns",
- System.nanoTime() - startTime);
}
private void checkTimeOutForQuery() {