This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by 
this push:
     new 1f81b60faf3 temp
1f81b60faf3 is described below

commit 1f81b60faf38a68e7895bc66ebde7264df3a75b4
Author: Beyyes <[email protected]>
AuthorDate: Mon Nov 6 11:20:30 2023 +0800

    temp
---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  11 +--
 .../plan/analyze/TemplatedDeviceAnalyze.java       | 109 ++++++++++++++++++---
 2 files changed, 101 insertions(+), 19 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index a73c43f0e2b..aa26492e0c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -209,7 +209,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
   private static final Logger logger = 
LoggerFactory.getLogger(AnalyzeVisitor.class);
 
-  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
 
   protected static final Expression DEVICE_EXPRESSION =
       TimeSeriesOperand.constructColumnHeaderExpression(DEVICE, 
TSDataType.TEXT);
@@ -280,12 +280,11 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       List<Pair<Expression, String>> outputExpressions;
       if (queryStatement.isAlignByDevice()) {
 
+        // TODO examine that if all devices are in same template
         if (!queryStatement.isAggregationQuery()) {
-          analysis =
-              TemplatedDeviceAnalyze.visitQuery(analysis, queryStatement, 
context, schemaTree);
-          // fetch partition information
-          analyzeDataPartition(analysis, queryStatement, schemaTree);
-          return analysis;
+          return new TemplatedDeviceAnalyze(
+                  analysis, queryStatement, context, schemaTree, 
partitionFetcher)
+              .visitQuery();
         }
 
         List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
index 30b7212a3d7..55be396d75a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.plan.analyze;
 
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.sql.MeasurementNotExistException;
@@ -27,6 +30,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
@@ -35,6 +39,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
@@ -42,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +61,8 @@ import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.CONFIG;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.END_TIME_EXPRESSION;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.WHERE_WRONG_TYPE_ERROR_MSG;
@@ -62,6 +70,7 @@ import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyz
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.checkDeviceViewInputUniqueness;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.getTimePartitionSlotList;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
@@ -82,23 +91,41 @@ public class TemplatedDeviceAnalyze {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TemplatedDeviceAnalyze.class);
 
-  /**
-   * 并行 最上层比如加sort fe 线程管理
-   *
-   * @param queryStatement
-   * @param context
-   * @return
-   */
-  static Analysis visitQuery(
+  private boolean isWildCardQuery;
+
+  private boolean isOriginalMeasurementQuery;
+
+  private Analysis analysis;
+
+  private QueryStatement queryStatement;
+
+  private MPPQueryContext context;
+
+  private ISchemaTree schemaTree;
+
+  private final IPartitionFetcher partitionFetcher;
+
+  public TemplatedDeviceAnalyze(
       Analysis analysis,
       QueryStatement queryStatement,
       MPPQueryContext context,
-      ISchemaTree schemaTree) {
+      ISchemaTree schemaTree,
+      IPartitionFetcher partitionFetcher) {
+    this.analysis = analysis;
+    this.queryStatement = queryStatement;
+    this.context = context;
+    this.schemaTree = schemaTree;
+    this.partitionFetcher = partitionFetcher;
+  }
+
+  /** 并行 最上层比如加sort fe 线程管理 */
+  public Analysis visitQuery() {
 
     long startTime = System.currentTimeMillis();
 
     List<Pair<Expression, String>> outputExpressions;
 
+    // TODO-1 change the return value of this method, return `deviceList + 
template`
     List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);
     logger.warn("----- Analyze analyzeFrom cost: {}ms", 
System.currentTimeMillis() - startTime);
     startTime = System.currentTimeMillis();
@@ -108,6 +135,7 @@ public class TemplatedDeviceAnalyze {
       deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, 
queryStatement);
     }
 
+    //
     analyzeDeviceToWhere(analysis, queryStatement, schemaTree, deviceList);
     logger.warn(
         "----- Analyze analyzeDeviceToWhere cost: {}ms", 
System.currentTimeMillis() - startTime);
@@ -129,9 +157,9 @@ public class TemplatedDeviceAnalyze {
     // analyzeDeviceToOrderBy(analysis, queryStatement, schemaTree, 
deviceList);
     // analyzeHaving(analysis, queryStatement, schemaTree, deviceList);
     // analyzeDeviceToAggregation(analysis, queryStatement);
-    // analyzeDeviceToSourceTransform(analysis, queryStatement);
+    analyzeDeviceToSourceTransform(analysis, queryStatement);
     // analyzeDeviceToSource(analysis, queryStatement);
-    
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+
     
analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
     
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());
     logger.warn(
@@ -153,6 +181,8 @@ public class TemplatedDeviceAnalyze {
     // generate result set header according to output expressions
     analyzeOutput(analysis, queryStatement, outputExpressions);
 
+    // fetch partition information
+    analyzeDataPartition(analysis, queryStatement, schemaTree);
     logger.warn(
         "----- Analyze analyzeOutput+analyzeDataPartition cost: {}ms",
         System.currentTimeMillis() - startTime);
@@ -185,13 +215,13 @@ public class TemplatedDeviceAnalyze {
       Analysis analysis,
       QueryStatement queryStatement,
       ISchemaTree schemaTree,
-      List<PartialPath> deviceSet) {
+      List<PartialPath> deviceList) {
     if (!queryStatement.hasWhere()) {
       return;
     }
 
     Map<String, Expression> deviceToWhereExpression = new HashMap<>();
-    Iterator<PartialPath> deviceIterator = deviceSet.iterator();
+    Iterator<PartialPath> deviceIterator = deviceList.iterator();
     while (deviceIterator.hasNext()) {
       PartialPath devicePath = deviceIterator.next();
       Expression whereExpression;
@@ -321,6 +351,8 @@ public class TemplatedDeviceAnalyze {
         }
       }
     } else {
+      
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());
+
       updateDeviceToSourceTransformAndOutputExpressions(
           analysis, analysis.getDeviceToSelectExpressions());
       if (queryStatement.hasOrderByExpression()) {
@@ -477,4 +509,55 @@ public class TemplatedDeviceAnalyze {
     analysis.setDeviceToSourceExpressions(deviceToSourceExpressions);
     
analysis.setOutputDeviceToQueriedDevicesMap(outputDeviceToQueriedDevicesMap);
   }
+
+  private void analyzeDataPartition(
+      Analysis analysis, QueryStatement queryStatement, ISchemaTree 
schemaTree) {
+    Set<String> deviceSet = new HashSet<>();
+    if (queryStatement.isAlignByDevice()) {
+      deviceSet =
+          analysis.getDeviceList().stream()
+              .map(PartialPath::getFullPath)
+              .collect(Collectors.toSet());
+    } else {
+      for (Expression expression : analysis.getSourceExpressions()) {
+        
deviceSet.add(ExpressionAnalyzer.getDeviceNameInSourceExpression(expression));
+      }
+    }
+    DataPartition dataPartition =
+        fetchDataPartitionByDevices(deviceSet, schemaTree, 
analysis.getGlobalTimeFilter());
+    analysis.setDataPartitionInfo(dataPartition);
+  }
+
+  private DataPartition fetchDataPartitionByDevices(
+      Set<String> deviceSet, ISchemaTree schemaTree, Filter globalTimeFilter) {
+    long startTime = System.nanoTime();
+    try {
+      Pair<List<TTimePartitionSlot>, Pair<Boolean, Boolean>> res =
+          getTimePartitionSlotList(globalTimeFilter);
+      // there is no satisfied time range
+      if (res.left.isEmpty() && Boolean.FALSE.equals(res.right.left)) {
+        return new DataPartition(
+            Collections.emptyMap(),
+            CONFIG.getSeriesPartitionExecutorClass(),
+            CONFIG.getSeriesPartitionSlotNum());
+      }
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new 
HashMap<>();
+      for (String devicePath : deviceSet) {
+        DataPartitionQueryParam queryParam =
+            new DataPartitionQueryParam(devicePath, res.left, res.right.left, 
res.right.right);
+        sgNameToQueryParamsMap
+            .computeIfAbsent(schemaTree.getBelongedDatabase(devicePath), key 
-> new ArrayList<>())
+            .add(queryParam);
+      }
+
+      if (res.right.left || res.right.right) {
+        return 
partitionFetcher.getDataPartitionWithUnclosedTimeRange(sgNameToQueryParamsMap);
+      } else {
+        return partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
+      }
+    } finally {
+      QueryPlanCostMetricSet.getInstance()
+          .recordPlanCost(PARTITION_FETCHER, System.nanoTime() - startTime);
+    }
+  }
 }

Reply via email to