This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/multi_devices_fe
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/multi_devices_fe by 
this push:
     new 496252d0233 optimze analyzeSelect, doLogicalPlan
496252d0233 is described below

commit 496252d02337208a8306fc5bcca5be84bd58b89d
Author: Beyyes <[email protected]>
AuthorDate: Tue Nov 7 17:55:38 2023 +0800

    optimze analyzeSelect, doLogicalPlan
---
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |  32 ++-
 .../plan/analyze/TemplatedDeviceAnalyze.java       |  22 +-
 .../plan/planner/LogicalPlanVisitor.java           |   4 +
 .../plan/planner/TemplatedLogicalPlan.java         | 262 +++++++++++++++++++++
 4 files changed, 305 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index bdde20142b4..464d4c07afd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.common.header.ColumnHeader;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
+import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
 import org.apache.iotdb.db.queryengine.execution.operator.window.WindowType;
@@ -423,25 +424,48 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     // request schema fetch API
     long startTime = System.nanoTime();
-    ISchemaTree schemaTree;
+    ISchemaTree schemaTree = null;
+    Template template = null;
     try {
       logger.debug("[StartFetchSchema]");
       if (queryStatement.isGroupByTag()) {
         schemaTree =
             
schemaFetcher.fetchSchemaWithTags(concatPathRewriter.getPatternTree(), context);
       } else {
-        schemaTree = 
schemaFetcher.fetchSchema(concatPathRewriter.getPatternTree(), context);
+        // TODO reinforce the judgement condition
+        //        if (queryStatement.isAlignByDevice() && 
!queryStatement.isAggregationQuery()) {
+        //          for (PartialPath partialPath :
+        // concatPathRewriter.getPatternTree().getAllDevicePaths()) {
+        //            Pair<Template, PartialPath> templateSetInfo =
+        // schemaFetcher.checkTemplateSetInfo(partialPath);
+        //            if (templateSetInfo != null) {
+        //              if (template == null) {
+        //                template = templateSetInfo.getLeft();
+        //              } else if (templateSetInfo.getLeft().getId() != 
template.getId()) {
+        //                template = null;
+        //                break;
+        //              }
+        //            }
+        //          }
+        //        }
+
+        if (template == null) {
+          schemaTree = 
schemaFetcher.fetchSchema(concatPathRewriter.getPatternTree(), context);
+        }
       }
 
       // make sure paths in logical view is fetched
-      updateSchemaTreeByViews(analysis, schemaTree);
+      if (template == null) {
+        updateSchemaTreeByViews(analysis, schemaTree);
+      }
     } finally {
       logger.debug("[EndFetchSchema]");
       QueryPlanCostMetricSet.getInstance()
           .recordPlanCost(SCHEMA_FETCHER, System.nanoTime() - startTime);
     }
+
     analysis.setSchemaTree(schemaTree);
-    return schemaTree;
+    return schemaTree == null ? new ClusterSchemaTree() : schemaTree;
   }
 
   private Analysis finishQuery(QueryStatement queryStatement, Analysis 
analysis) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
index bdb421c47d7..dbad1de404a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TemplatedDeviceAnalyze.java
@@ -294,17 +294,17 @@ public class TemplatedDeviceAnalyze {
               new MeasurementPath(new String[] {measurementName}, 
measurementSchema));
       // analyzeExpression(analysis, measurementPath);
       outputExpressions.add(new Pair<>(measurementPath, null));
-      for (PartialPath devicePath : deviceList) {
-        // TODO how to determine whether a device is aligned device
-        TimeSeriesOperand fullPath =
-            new TimeSeriesOperand(
-                new MeasurementPath(
-                    devicePath.concatNode(measurementName), measurementSchema, 
true));
-        // analyzeExpression(analysis, fullPath);
-        deviceToSelectExpressions
-            .computeIfAbsent(devicePath.getFullPath(), k -> new 
LinkedHashSet<>())
-            .add(fullPath);
-      }
+      //      for (PartialPath devicePath : deviceList) {
+      //        // TODO how to determine whether a device is aligned device
+      //        TimeSeriesOperand fullPath =
+      //            new TimeSeriesOperand(
+      //                new MeasurementPath(
+      //                    devicePath.concatNode(measurementName), 
measurementSchema, true));
+      //        // analyzeExpression(analysis, fullPath);
+      //        deviceToSelectExpressions
+      //            .computeIfAbsent(devicePath.getFullPath(), k -> new 
LinkedHashSet<>())
+      //            .add(fullPath);
+      //      }
     }
 
     Set<Expression> selectExpressions = new LinkedHashSet<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index d7e810b2b5f..eb6a66f36cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -145,6 +145,10 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
       return planBuilder.getRoot();
     }
 
+    if (queryStatement.isAlignByDevice() && 
!queryStatement.isAggregationQuery()) {
+      return new TemplatedLogicalPlan(analysis, queryStatement, 
context).visitQuery();
+    }
+
     if (queryStatement.isAlignByDevice()) {
       Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
       for (PartialPath device : analysis.getDeviceList()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
new file mode 100644
index 00000000000..08d80a6c264
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.planner;
+
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.udf.builtin.ModelInferenceFunction;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ForecastModelInferenceDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.model.ModelInferenceDescriptor;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.DEVICE;
+import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
+
+public class TemplatedLogicalPlan {
+
+  private Analysis analysis;
+  private QueryStatement queryStatement;
+  private MPPQueryContext context;
+  // If it's not `select *` query, may change the value of list below
+  List<String> measurementList = null;
+  List<IMeasurementSchema> schemaList = null;
+
+  public TemplatedLogicalPlan(
+      Analysis analysis, QueryStatement queryStatement, MPPQueryContext 
context) {
+    this.analysis = analysis;
+    this.queryStatement = queryStatement;
+    this.context = context;
+
+    measurementList = new 
ArrayList<>(analysis.getTemplateTypes().getSchemaMap().keySet());
+    schemaList = new 
ArrayList<>(analysis.getTemplateTypes().getSchemaMap().values());
+  }
+
+  public PlanNode visitQuery() {
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
+
+    Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>();
+    for (PartialPath devicePath : analysis.getDeviceList()) {
+      String deviceName = devicePath.getFullPath();
+      LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, 
context);
+      subPlanBuilder =
+          subPlanBuilder.withNewRoot(
+              visitQueryBody(
+                  devicePath,
+                  analysis,
+                  queryStatement,
+                  analysis.getDeviceToSourceExpressions().get(deviceName),
+                  
analysis.getDeviceToSourceTransformExpressions().get(deviceName),
+                  analysis.getDeviceToWhereExpression() != null
+                      ? analysis.getDeviceToWhereExpression().get(deviceName)
+                      : null,
+                  analysis.getDeviceToAggregationExpressions().get(deviceName),
+                  analysis.getDeviceToGroupByExpression() != null
+                      ? analysis.getDeviceToGroupByExpression().get(deviceName)
+                      : null,
+                  analysis.getDeviceViewInputIndexesMap().get(deviceName),
+                  context));
+
+      // sortOperator push down
+      if (queryStatement.needPushDownSort()) {
+        subPlanBuilder =
+            subPlanBuilder.planOrderBy(
+                analysis.getDeviceToOrderByExpressions().get(deviceName),
+                analysis.getDeviceToSortItems().get(deviceName));
+      }
+      deviceToSubPlanMap.put(deviceName, subPlanBuilder.getRoot());
+    }
+
+    // convert to ALIGN BY DEVICE view
+    planBuilder =
+        planBuilder.planDeviceView(
+            deviceToSubPlanMap,
+            analysis.getDeviceViewOutputExpressions(),
+            analysis.getDeviceViewInputIndexesMap(),
+            analysis.getSelectExpressions(),
+            queryStatement);
+
+    if (planBuilder.getRoot() instanceof TopKNode) {
+      analysis.setUseTopKNode();
+    }
+
+    if (queryStatement.isAggregationQuery()) {
+      planBuilder =
+          planBuilder.planHavingAndTransform(
+              analysis.getHavingExpression(),
+              analysis.getSelectExpressions(),
+              analysis.getOrderByExpressions(),
+              queryStatement.isGroupByTime(),
+              queryStatement.getSelectComponent().getZoneId(),
+              queryStatement.getResultTimeOrder());
+    }
+
+    if (!queryStatement.needPushDownSort()) {
+      planBuilder =
+          planBuilder.planOrderBy(
+              queryStatement, analysis.getOrderByExpressions(), 
analysis.getSelectExpressions());
+    }
+
+    // other upstream node
+    planBuilder =
+        planBuilder
+            .planFill(analysis.getFillDescriptor(), 
queryStatement.getResultTimeOrder())
+            .planOffset(queryStatement.getRowOffset())
+            .planLimit(queryStatement.getRowLimit());
+
+    if (queryStatement.isModelInferenceQuery()) {
+      ModelInferenceDescriptor modelInferenceDescriptor = 
analysis.getModelInferenceDescriptor();
+      if (Objects.requireNonNull(modelInferenceDescriptor.getFunctionType())
+          == ModelInferenceFunction.FORECAST) {
+        ForecastModelInferenceDescriptor forecastModelInferenceDescriptor =
+            (ForecastModelInferenceDescriptor) modelInferenceDescriptor;
+        planBuilder
+            .planLimit(forecastModelInferenceDescriptor.getModelInputLength())
+            .planForecast(forecastModelInferenceDescriptor);
+      } else {
+        throw new IllegalArgumentException(
+            "Unsupported model inference function type: "
+                + modelInferenceDescriptor.getFunctionType());
+      }
+    }
+
+    // plan select into
+    if (queryStatement.isAlignByDevice()) {
+      planBuilder = 
planBuilder.planDeviceViewInto(analysis.getDeviceViewIntoPathDescriptor());
+    } else {
+      planBuilder = planBuilder.planInto(analysis.getIntoPathDescriptor());
+    }
+
+    return planBuilder.getRoot();
+  }
+
+  public PlanNode visitQueryBody(
+      PartialPath devicePath,
+      Analysis analysis,
+      QueryStatement queryStatement,
+      Set<Expression> sourceExpressions,
+      Set<Expression> sourceTransformExpressions,
+      Expression whereExpression,
+      Set<Expression> aggregationExpressions,
+      Expression groupByExpression,
+      List<Integer> deviceViewInputIndexes,
+      MPPQueryContext context) {
+    return planRawDataSource(
+        devicePath,
+        queryStatement.getResultTimeOrder(),
+        analysis.getGlobalTimeFilter(),
+        0,
+        pushDownLimitToScanNode(queryStatement),
+        analysis.isLastLevelUseWildcard());
+  }
+
+  private long pushDownLimitToScanNode(QueryStatement queryStatement) {
+    // `order by time|device LIMIT N align by device` and no value filter,
+    // can push down limitValue to ScanNode
+    if (queryStatement.isAlignByDevice()
+        && queryStatement.hasLimit()
+        && !analysis.hasValueFilter()
+        && (queryStatement.isOrderByBasedOnDevice() || 
queryStatement.isOrderByBasedOnTime())) {
+
+      // both `offset` and `limit` exist, push `limit+offset` down as 
limitValue
+      if (queryStatement.hasOffset()) {
+        return queryStatement.getRowOffset() + queryStatement.getRowLimit();
+      }
+
+      // only `limit` exist, push `limit` down as limitValue
+      return queryStatement.getRowLimit();
+    }
+
+    return 0;
+  }
+
+  public PlanNode planRawDataSource(
+      PartialPath devicePath,
+      Ordering scanOrder,
+      Filter timeFilter,
+      long offset,
+      long limit,
+      boolean lastLevelUseWildcard) {
+    List<PlanNode> sourceNodeList = new ArrayList<>();
+
+    AlignedPath path = new AlignedPath(devicePath);
+    path.setMeasurementList(measurementList);
+    path.addSchemas(schemaList);
+
+    AlignedSeriesScanNode alignedSeriesScanNode =
+        new AlignedSeriesScanNode(
+            context.getQueryId().genPlanNodeId(),
+            path,
+            scanOrder,
+            timeFilter,
+            timeFilter,
+            limit,
+            offset,
+            null,
+            lastLevelUseWildcard);
+    sourceNodeList.add(alignedSeriesScanNode);
+
+    // just group and put into type provider
+    // updateTypeProvider(sourceExpressions);
+
+    return convergeWithTimeJoin(sourceNodeList, scanOrder);
+  }
+
+  private void updateTypeProvider(Collection<Expression> expressions) {
+    if (expressions == null) {
+      return;
+    }
+    expressions.forEach(
+        expression -> {
+          if (!expression.getExpressionString().equals(DEVICE)
+              && !expression.getExpressionString().equals(ENDTIME)) {
+            context
+                .getTypeProvider()
+                .setType(expression.getExpressionString(), 
analysis.getType(expression));
+          }
+        });
+  }
+
+  private PlanNode convergeWithTimeJoin(List<PlanNode> sourceNodes, Ordering 
mergeOrder) {
+    PlanNode tmpNode;
+    if (sourceNodes.size() == 1) {
+      tmpNode = sourceNodes.get(0);
+    } else {
+      tmpNode = new TimeJoinNode(context.getQueryId().genPlanNodeId(), 
mergeOrder, sourceNodes);
+    }
+    return tmpNode;
+  }
+}

Reply via email to