This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch trasnsform-operator-bugfix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 666ee799e867848c507023ee59f47f9c4505c9d9
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 16 12:15:01 2022 +0800

    bind input types
---
 .../mpp/execution/operator/process/FilterOperator.java  |  6 ++++--
 .../execution/operator/process/TransformOperator.java   | 17 ++++++++---------
 .../db/mpp/plan/planner/LocalExecutionPlanner.java      | 12 ++++++++++--
 .../db/query/udf/core/layer/EvaluationDAGBuilder.java   |  7 ++++---
 4 files changed, 26 insertions(+), 16 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index 54023374cf..8522d065cc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -75,9 +75,11 @@ public class FilterOperator extends TransformOperator {
 
   @Override
   protected void initTransformers(
-      Map<String, List<InputLocation>> inputLocations, TypeProvider 
typeProvider)
+      Map<String, List<InputLocation>> inputLocations,
+      Expression[] outputExpressions,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
-    super.initTransformers(inputLocations, typeProvider);
+    super.initTransformers(inputLocations, outputExpressions, typeProvider);
 
     filterPointReader = transformers[transformers.length - 1];
     if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index de3cedcc3c..784573cd1b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -63,8 +63,6 @@ public class TransformOperator implements ProcessOperator {
 
   protected final OperatorContext operatorContext;
   protected final Operator inputOperator;
-  protected final List<TSDataType> inputDataTypes;
-  protected final Expression[] outputExpressions;
   protected final boolean keepNull;
 
   protected boolean isFirstIteration;
@@ -87,15 +85,13 @@ public class TransformOperator implements ProcessOperator {
       throws QueryProcessException, IOException {
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
-    this.inputDataTypes = inputDataTypes;
-    this.outputExpressions = outputExpressions;
     this.keepNull = keepNull;
 
     isFirstIteration = true;
 
     initInputLayer(inputDataTypes);
-    initUdtfContext(zoneId);
-    initTransformers(inputLocations, typeProvider);
+    initUdtfContext(outputExpressions, zoneId);
+    initTransformers(inputLocations, outputExpressions, typeProvider);
   }
 
   private void initInputLayer(List<TSDataType> inputDataTypes) throws 
QueryProcessException {
@@ -106,13 +102,15 @@ public class TransformOperator implements ProcessOperator 
{
             new TsBlockInputDataSet(inputOperator, inputDataTypes));
   }
 
-  private void initUdtfContext(ZoneId zoneId) {
+  private void initUdtfContext(Expression[] outputExpressions, ZoneId zoneId) {
     udtfContext = new UDTFContext(zoneId);
     udtfContext.constructUdfExecutors(outputExpressions);
   }
 
   protected void initTransformers(
-      Map<String, List<InputLocation>> inputLocations, TypeProvider 
typeProvider)
+      Map<String, List<InputLocation>> inputLocations,
+      Expression[] outputExpressions,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     try {
@@ -125,10 +123,11 @@ public class TransformOperator implements ProcessOperator 
{
                   inputLayer,
                   inputLocations,
                   outputExpressions,
-                  udtfContext,
                   typeProvider,
+                  udtfContext,
                   udfTransformerMemoryBudgetInMB + 
udfCollectorMemoryBudgetInMB)
               .buildLayerMemoryAssigner()
+              .bindInputLayerColumnIndexWithExpression()
               .buildResultColumnPointReaders()
               .getOutputPointReaders();
     } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 492ab8f029..6a8c5956e2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -454,7 +454,7 @@ public class LocalExecutionPlanner {
               node.getPlanNodeId(),
               TransformNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
-      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, 
context.getTypeProvider());
       final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
@@ -478,7 +478,7 @@ public class LocalExecutionPlanner {
           context.instanceContext.addOperatorContext(
               context.getNextOperatorId(), node.getPlanNodeId(), 
FilterNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
-      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+      final List<TSDataType> inputDataTypes = getInputColumnTypes(node, 
context.getTypeProvider());
       final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
@@ -680,6 +680,14 @@ public class LocalExecutionPlanner {
       return outputMappings;
     }
 
+    private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider 
typeProvider) {
+      return node.getChildren().stream()
+          .map(PlanNode::getOutputColumnNames)
+          .flatMap(List::stream)
+          .map(typeProvider::getType)
+          .collect(Collectors.toList());
+    }
+
     private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider 
typeProvider) {
       return node.getOutputColumnNames().stream()
           .map(typeProvider::getType)
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index 80479da2d1..003447f0e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -41,6 +41,8 @@ public class EvaluationDAGBuilder {
   private final Expression[] outputExpressions;
   private final LayerPointReader[] outputPointReaders;
 
+  private final TypeProvider typeProvider;
+
   private final UDTFContext udtfContext;
 
   private final LayerMemoryAssigner memoryAssigner;
@@ -50,22 +52,21 @@ public class EvaluationDAGBuilder {
   // sub-expressions, but they can share the same point reader. we cache the 
point reader here to
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> 
expressionIntermediateLayerMap;
-  private final TypeProvider typeProvider;
 
   public EvaluationDAGBuilder(
       long queryId,
       RawQueryInputLayer inputLayer,
       Map<String, List<InputLocation>> inputLocations,
       Expression[] outputExpressions,
-      UDTFContext udtfContext,
       TypeProvider typeProvider,
+      UDTFContext udtfContext,
       float memoryBudgetInMB) {
     this.queryId = queryId;
     this.inputLayer = inputLayer;
     this.inputLocations = inputLocations;
     this.outputExpressions = outputExpressions;
-    this.udtfContext = udtfContext;
     this.typeProvider = typeProvider;
+    this.udtfContext = udtfContext;
 
     int size = inputLayer.getInputColumnCount();
     outputPointReaders = new LayerPointReader[size];

Reply via email to