This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by
this push:
new 4f6cdee97b8 perfect align by device
4f6cdee97b8 is described below
commit 4f6cdee97b880dbf6bdd39062129ff0bf509f7d7
Author: Beyyes <[email protected]>
AuthorDate: Mon Nov 6 16:20:20 2023 +0800
perfect align by device
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 22 +----
.../plan/analyze/TemplatedDeviceAnalyze.java | 108 +++++++++++++--------
.../queryengine/plan/execution/QueryExecution.java | 4 +-
3 files changed, 72 insertions(+), 62 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index aa26492e0c8..bdde20142b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -262,8 +262,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
ISchemaTree schemaTree = analyzeSchema(queryStatement, analysis,
context);
- logger.warn("----- Analyze analyzeSchema cost: {}ms",
System.currentTimeMillis() - startTime);
- startTime = System.currentTimeMillis();
+ logger.warn("--- [analyzeSchema] : {}ms", System.currentTimeMillis() -
startTime);
// If there is no leaf node in the schema tree, the query should be
completed immediately
if (schemaTree.isEmpty()) {
@@ -296,13 +295,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
- outputExpressions =
- TemplatedDeviceAnalyze.analyzeSelectUseTemplate(
- analysis, queryStatement, schemaTree, deviceList);
-
- logger.warn(
- "----- Analyze analyzeSelect cost: {}ms",
System.currentTimeMillis() - startTime);
- startTime = System.currentTimeMillis();
+ outputExpressions = analyzeSelect(analysis, queryStatement,
schemaTree, deviceList);
if (deviceList.isEmpty()) {
return finishQuery(queryStatement, analysis);
@@ -316,18 +309,10 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analyzeDeviceToAggregation(analysis, queryStatement);
analyzeDeviceToSourceTransform(analysis, queryStatement);
analyzeDeviceToSource(analysis, queryStatement);
- logger.warn(
- "----- Analyze analyzeDeviceToSource +
analyzeDeviceToSourceTransform cost: {}ms",
- System.currentTimeMillis() - startTime);
- startTime = System.currentTimeMillis();
analyzeDeviceViewOutput(analysis, queryStatement);
analyzeDeviceViewInput(analysis, queryStatement);
- logger.warn(
- "----- Analyze analyzeDeviceView cost: {}ms",
System.currentTimeMillis() - startTime);
- startTime = System.currentTimeMillis();
-
analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
} else {
Map<Integer, List<Pair<Expression, String>>> outputExpressionMap =
@@ -377,9 +362,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// fetch partition information
analyzeDataPartition(analysis, queryStatement, schemaTree);
- logger.warn(
- "----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
- System.currentTimeMillis() - startTime);
} catch (StatementAnalyzeException e) {
throw new StatementAnalyzeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
index 55be396d75a..fc5cd296c75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -59,7 +59,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkState;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.CONFIG;
@@ -116,6 +115,19 @@ public class TemplatedDeviceAnalyze {
this.context = context;
this.schemaTree = schemaTree;
this.partitionFetcher = partitionFetcher;
+
+ if (queryStatement.getSelectComponent().getResultColumns().size() == 1) {
+ if ("*"
+ .equals(
+ queryStatement
+ .getSelectComponent()
+ .getResultColumns()
+ .get(0)
+ .getExpression()
+ .getOutputSymbol())) {
+ isWildCardQuery = true;
+ }
+ }
}
/** 并行 最上层比如加sort fe 线程管理 */
@@ -127,7 +139,7 @@ public class TemplatedDeviceAnalyze {
// TODO-1 change the return value of this method, return `deviceList +
template`
List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
- logger.warn("----- Analyze analyzeFrom cost: {}ms",
System.currentTimeMillis() - startTime);
+ logger.warn("--- [analyzeFrom] : {}ms", System.currentTimeMillis() -
startTime);
startTime = System.currentTimeMillis();
if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
@@ -135,17 +147,13 @@ public class TemplatedDeviceAnalyze {
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList,
queryStatement);
}
- //
+ // TODO-2 optimize for where filter using template
analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
- logger.warn(
- "----- Analyze analyzeDeviceToWhere cost: {}ms",
System.currentTimeMillis() - startTime);
+ logger.warn("--- [analyzeDeviceToWhere] : {}ms",
System.currentTimeMillis() - startTime);
startTime = System.currentTimeMillis();
- outputExpressions =
- TemplatedDeviceAnalyze.analyzeSelectUseTemplate(
- analysis, queryStatement, schemaTree, deviceList);
-
- logger.warn("----- Analyze analyzeSelect cost: {}ms",
System.currentTimeMillis() - startTime);
+ outputExpressions = analyzeSelectUseTemplate(analysis, queryStatement,
schemaTree, deviceList);
+ logger.warn("--- [analyzeSelect] : {}ms", System.currentTimeMillis() -
startTime);
startTime = System.currentTimeMillis();
if (deviceList.isEmpty()) {
@@ -158,20 +166,17 @@ public class TemplatedDeviceAnalyze {
// analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
// analyzeDeviceToAggregation(analysis, queryStatement);
analyzeDeviceToSourceTransform(analysis, queryStatement);
- // analyzeDeviceToSource(analysis, queryStatement);
+ analyzeDeviceToSource(analysis, queryStatement);
-
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
-
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
logger.warn(
- "----- Analyze analyzeDeviceToSource + analyzeDeviceToSourceTransform
cost: {}ms",
+ "--- [analyzeDeviceToSource + analyzeDeviceToSourceTransform] : {}ms",
System.currentTimeMillis() - startTime);
startTime = System.currentTimeMillis();
analyzeDeviceViewOutput(analysis, queryStatement);
analyzeDeviceViewInput(analysis, queryStatement);
- logger.warn(
- "----- Analyze analyzeDeviceView cost: {}ms",
System.currentTimeMillis() - startTime);
+ logger.warn("--- [analyzeDeviceView] : {}ms", System.currentTimeMillis() -
startTime);
startTime = System.currentTimeMillis();
// analyzeInto(analysis, queryStatement, deviceList, outputExpressions);
@@ -180,12 +185,12 @@ public class TemplatedDeviceAnalyze {
// generate result set header according to output expressions
analyzeOutput(analysis, queryStatement, outputExpressions);
+ logger.warn("--- [analyzeOutput] : {}ms", System.currentTimeMillis() -
startTime);
+ startTime = System.currentTimeMillis();
// fetch partition information
analyzeDataPartition(analysis, queryStatement, schemaTree);
- logger.warn(
- "----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
- System.currentTimeMillis() - startTime);
+ logger.warn("--- [analyzeDataPartition] : {}ms",
System.currentTimeMillis() - startTime);
return analysis;
}
@@ -312,8 +317,7 @@ public class TemplatedDeviceAnalyze {
return outputExpressions;
}
- private static void analyzeDeviceToSourceTransform(
- Analysis analysis, QueryStatement queryStatement) {
+ private void analyzeDeviceToSourceTransform(Analysis analysis,
QueryStatement queryStatement) {
if (queryStatement.isAggregationQuery()) {
Map<String, Set<Expression>> deviceToSourceTransformExpressions =
analysis.getDeviceToSourceTransformExpressions();
@@ -351,13 +355,15 @@ public class TemplatedDeviceAnalyze {
}
}
} else {
-
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
-
- updateDeviceToSourceTransformAndOutputExpressions(
- analysis, analysis.getDeviceToSelectExpressions());
- if (queryStatement.hasOrderByExpression()) {
+ if (isWildCardQuery || isOriginalMeasurementQuery) {
+
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+ } else {
updateDeviceToSourceTransformAndOutputExpressions(
- analysis, analysis.getDeviceToOrderByExpressions());
+ analysis, analysis.getDeviceToSelectExpressions());
+ if (queryStatement.hasOrderByExpression()) {
+ updateDeviceToSourceTransformAndOutputExpressions(
+ analysis, analysis.getDeviceToOrderByExpressions());
+ }
}
}
}
@@ -411,7 +417,8 @@ public class TemplatedDeviceAnalyze {
}
}
} else {
- deviceViewOutputExpressions.addAll(selectExpressions);
+ // TODO can also just set, instead of addAll in normal process?
+ deviceViewOutputExpressions = selectExpressions;
if (queryStatement.hasOrderByExpression()) {
deviceViewOutputExpressions.addAll(analysis.getOrderByExpressions());
}
@@ -421,7 +428,21 @@ public class TemplatedDeviceAnalyze {
analyzeDeviceViewSpecialProcess(deviceViewOutputExpressions,
queryStatement, analysis));
}
- private static void analyzeDeviceViewInput(Analysis analysis, QueryStatement
queryStatement) {
+ private void analyzeDeviceViewInput(Analysis analysis, QueryStatement
queryStatement) {
+ if (isWildCardQuery || isOriginalMeasurementQuery) {
+ List<Integer> indexes = new ArrayList<>();
+ // index-0 is `Device`
+ for (int i = 1; i < analysis.getSelectExpressions().size(); i++) {
+ indexes.add(i);
+ }
+ Map<String, List<Integer>> deviceViewInputIndexesMap = new HashMap<>();
+ for (PartialPath devicePath : analysis.getDeviceList()) {
+ deviceViewInputIndexesMap.put(devicePath.getFullPath(), indexes);
+ }
+ analysis.setDeviceViewInputIndexesMap(deviceViewInputIndexesMap);
+ return;
+ }
+
List<String> deviceViewOutputColumns =
analysis.getDeviceViewOutputExpressions().stream()
.map(Expression::getOutputSymbol)
@@ -430,9 +451,8 @@ public class TemplatedDeviceAnalyze {
Map<String, Set<String>> deviceToOutputColumnsMap = new LinkedHashMap<>();
Map<String, Set<Expression>> deviceToOutputExpressions =
analysis.getDeviceToOutputExpressions();
- for (Map.Entry<String, Set<Expression>> deviceOutputExpressionEntry :
- deviceToOutputExpressions.entrySet()) {
- Set<Expression> outputExpressionsUnderDevice =
deviceOutputExpressionEntry.getValue();
+ for (Map.Entry<String, Set<Expression>> entry :
deviceToOutputExpressions.entrySet()) {
+ Set<Expression> outputExpressionsUnderDevice = entry.getValue();
checkDeviceViewInputUniqueness(outputExpressionsUnderDevice);
Set<String> outputColumns = new LinkedHashSet<>();
@@ -442,20 +462,22 @@ public class TemplatedDeviceAnalyze {
for (Expression expression : outputExpressionsUnderDevice) {
outputColumns.add(getMeasurementExpression(expression,
analysis).getOutputSymbol());
}
- deviceToOutputColumnsMap.put(deviceOutputExpressionEntry.getKey(),
outputColumns);
+ deviceToOutputColumnsMap.put(entry.getKey(), outputColumns);
}
Map<String, List<Integer>> deviceViewInputIndexesMap = new HashMap<>();
- for (Map.Entry<String, Set<String>> deviceOutputColumnsEntry :
- deviceToOutputColumnsMap.entrySet()) {
- String deviceName = deviceOutputColumnsEntry.getKey();
- List<String> outputsUnderDevice = new
ArrayList<>(deviceOutputColumnsEntry.getValue());
+ for (Map.Entry<String, Set<String>> entry :
deviceToOutputColumnsMap.entrySet()) {
+ String deviceName = entry.getKey();
+ List<String> outputsUnderDevice = new ArrayList<>(entry.getValue());
List<Integer> indexes = new ArrayList<>();
for (String output : outputsUnderDevice) {
int index = deviceViewOutputColumns.indexOf(output);
- checkState(
- index >= 1, "output column '%s' is not stored in %s", output,
deviceViewOutputColumns);
+ if (index < 1) {
+ throw new IllegalStateException(
+ String.format(
+ "output column '%s' is not stored in %s", output,
deviceViewOutputColumns));
+ }
indexes.add(index);
}
deviceViewInputIndexesMap.put(deviceName, indexes);
@@ -463,7 +485,13 @@ public class TemplatedDeviceAnalyze {
analysis.setDeviceViewInputIndexesMap(deviceViewInputIndexesMap);
}
- private static void analyzeDeviceToSource(Analysis analysis, QueryStatement
queryStatement) {
+ private void analyzeDeviceToSource(Analysis analysis, QueryStatement
queryStatement) {
+ if (isWildCardQuery || isOriginalMeasurementQuery) {
+
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
+
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
+ return;
+ }
+
Map<String, Set<Expression>> deviceToSourceExpressions = new HashMap<>();
Map<String, Set<Expression>> deviceToSourceTransformExpressions =
analysis.getDeviceToSourceTransformExpressions();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index e9c68fd6464..3b02fea3b7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -221,11 +221,11 @@ public class QueryExecution implements IQueryExecution {
// check timeout for query first
checkTimeOutForQuery();
doLogicalPlan();
- logger.warn("----- doLogicalPlan cost: {}ms", System.currentTimeMillis() -
sTime);
+ logger.warn("--- [doLogicalPlan] : {}ms", System.currentTimeMillis() -
sTime);
sTime = System.currentTimeMillis();
doDistributedPlan();
- logger.warn("----- doDistributedPlan cost: {}ms",
System.currentTimeMillis() - sTime);
+ logger.warn("--- [doDistributedPlan] : {}ms", System.currentTimeMillis() -
sTime);
// update timeout after finishing plan stage
context.setTimeOut(