This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by
this push:
new 1f81b60faf3 temp
1f81b60faf3 is described below
commit 1f81b60faf38a68e7895bc66ebde7264df3a75b4
Author: Beyyes <[email protected]>
AuthorDate: Mon Nov 6 11:20:30 2023 +0800
temp
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 11 +--
.../plan/analyze/TemplatedDeviceAnalyze.java | 109 ++++++++++++++++++---
2 files changed, 101 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index a73c43f0e2b..aa26492e0c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -209,7 +209,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
private static final Logger logger =
LoggerFactory.getLogger(AnalyzeVisitor.class);
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
protected static final Expression DEVICE_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(DEVICE,
TSDataType.TEXT);
@@ -280,12 +280,11 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
List<Pair<Expression, String>> outputExpressions;
if (queryStatement.isAlignByDevice()) {
+ // TODO examine that if all devices are in same template
if (!queryStatement.isAggregationQuery()) {
- analysis =
- TemplatedDeviceAnalyze.visitQuery(analysis, queryStatement,
context, schemaTree);
- // fetch partition information
- analyzeDataPartition(analysis, queryStatement, schemaTree);
- return analysis;
+ return new TemplatedDeviceAnalyze(
+ analysis, queryStatement, context, schemaTree,
partitionFetcher)
+ .visitQuery();
}
List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
index 30b7212a3d7..55be396d75a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -19,6 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
@@ -27,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
@@ -35,6 +39,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
@@ -42,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -55,6 +61,8 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.CONFIG;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.WHERE_WRONG_TYPE_ERROR_MSG;
@@ -62,6 +70,7 @@ import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyz
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.checkDeviceViewInputUniqueness;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
@@ -82,23 +91,41 @@ public class TemplatedDeviceAnalyze {
private static final Logger logger =
LoggerFactory.getLogger(TemplatedDeviceAnalyze.class);
- /**
- * 并行 最上层比如加sort fe 线程管理
- *
- * @param queryStatement
- * @param context
- * @return
- */
- static Analysis visitQuery(
+ private boolean isWildCardQuery;
+
+ private boolean isOriginalMeasurementQuery;
+
+ private Analysis analysis;
+
+ private QueryStatement queryStatement;
+
+ private MPPQueryContext context;
+
+ private ISchemaTree schemaTree;
+
+ private final IPartitionFetcher partitionFetcher;
+
+ public TemplatedDeviceAnalyze(
Analysis analysis,
QueryStatement queryStatement,
MPPQueryContext context,
- ISchemaTree schemaTree) {
+ ISchemaTree schemaTree,
+ IPartitionFetcher partitionFetcher) {
+ this.analysis = analysis;
+ this.queryStatement = queryStatement;
+ this.context = context;
+ this.schemaTree = schemaTree;
+ this.partitionFetcher = partitionFetcher;
+ }
+
+ /** 并行 最上层比如加sort fe 线程管理 */
+ public Analysis visitQuery() {
long startTime = System.currentTimeMillis();
List<Pair<Expression, String>> outputExpressions;
+ // TODO-1 change the return value of this method, return `deviceList +
template`
List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
logger.warn("----- Analyze analyzeFrom cost: {}ms",
System.currentTimeMillis() - startTime);
startTime = System.currentTimeMillis();
@@ -108,6 +135,7 @@ public class TemplatedDeviceAnalyze {
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList,
queryStatement);
}
+ //
analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
logger.warn(
"----- Analyze analyzeDeviceToWhere cost: {}ms",
System.currentTimeMillis() - startTime);
@@ -129,9 +157,9 @@ public class TemplatedDeviceAnalyze {
// analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree,
deviceList);
// analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
// analyzeDeviceToAggregation(analysis, queryStatement);
- // analyzeDeviceToSourceTransform(analysis, queryStatement);
+ analyzeDeviceToSourceTransform(analysis, queryStatement);
// analyzeDeviceToSource(analysis, queryStatement);
-
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
logger.warn(
@@ -153,6 +181,8 @@ public class TemplatedDeviceAnalyze {
// generate result set header according to output expressions
analyzeOutput(analysis, queryStatement, outputExpressions);
+ // fetch partition information
+ analyzeDataPartition(analysis, queryStatement, schemaTree);
logger.warn(
"----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
System.currentTimeMillis() - startTime);
@@ -185,13 +215,13 @@ public class TemplatedDeviceAnalyze {
Analysis analysis,
QueryStatement queryStatement,
ISchemaTree schemaTree,
- List<PartialPath> deviceSet) {
+ List<PartialPath> deviceList) {
if (!queryStatement.hasWhere()) {
return;
}
Map<String, Expression> deviceToWhereExpression = new HashMap<>();
- Iterator<PartialPath> deviceIterator = deviceSet.iterator();
+ Iterator<PartialPath> deviceIterator = deviceList.iterator();
while (deviceIterator.hasNext()) {
PartialPath devicePath = deviceIterator.next();
Expression whereExpression;
@@ -321,6 +351,8 @@ public class TemplatedDeviceAnalyze {
}
}
} else {
+
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+
updateDeviceToSourceTransformAndOutputExpressions(
analysis, analysis.getDeviceToSelectExpressions());
if (queryStatement.hasOrderByExpression()) {
@@ -477,4 +509,55 @@ public class TemplatedDeviceAnalyze {
analysis.setDeviceToSourceExpressions(deviceToSourceExpressions);
analysis.setOutputDeviceToQueriedDevicesMap(outputDeviceToQueriedDevicesMap);
}
+
+ private void analyzeDataPartition(
+ Analysis analysis, QueryStatement queryStatement, ISchemaTree
schemaTree) {
+ Set<String> deviceSet = new HashSet<>();
+ if (queryStatement.isAlignByDevice()) {
+ deviceSet =
+ analysis.getDeviceList().stream()
+ .map(PartialPath::getFullPath)
+ .collect(Collectors.toSet());
+ } else {
+ for (Expression expression : analysis.getSourceExpressions()) {
+
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
+ }
+ }
+ DataPartition dataPartition =
+ fetchDataPartitionByDevices(deviceSet, schemaTree,
analysis.getGlobalTimeFilter());
+ analysis.setDataPartitionInfo(dataPartition);
+ }
+
+ private DataPartition fetchDataPartitionByDevices(
+ Set<String> deviceSet, ISchemaTree schemaTree, Filter globalTimeFilter) {
+ long startTime = System.nanoTime();
+ try {
+ Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
+ getTimePartitionSlotList(globalTimeFilter);
+ // there is no satisfied time range
+ if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
+ return new DataPartition(
+ Collections.emptyMap(),
+ CONFIG.getSeriesPartitionExecutorClass(),
+ CONFIG.getSeriesPartitionSlotNum());
+ }
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new
HashMap<>();
+ for (String devicePath : deviceSet) {
+ DataPartitionQueryParam queryParam =
+ new DataPartitionQueryParam(devicePath, res.left, res.right.left,
res.right.right);
+ sgNameToQueryParamsMap
+ .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key
-> new ArrayList<>())
+ .add(queryParam);
+ }
+
+ if (res.right.left || res.right.right) {
+ return
partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+ } else {
+ return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+ }
+ } finally {
+ QueryPlanCostMetricSet.getInstance()
+ .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+ }
+ }
}