This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch trasnsform-operator-bugfix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5a3e4512f48a28cc6ededb13933ff6aa5b30cdf4
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 16 11:53:21 2022 +0800

    bind transformer input column with operator column input index
---
 .../db/mpp/execution/operator/process/FilterOperator.java |  9 +++++++--
 .../mpp/execution/operator/process/TransformOperator.java |  9 +++++++--
 .../iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java  |  4 ++++
 .../org/apache/iotdb/db/query/expression/Expression.java  |  6 ++++++
 .../db/query/expression/binary/BinaryExpression.java      | 15 ++++++++++++++-
 .../iotdb/db/query/expression/leaf/ConstantOperand.java   |  7 +++++++
 .../iotdb/db/query/expression/leaf/TimeSeriesOperand.java | 10 ++++++++++
 .../iotdb/db/query/expression/leaf/TimestampOperand.java  |  7 +++++++
 .../db/query/expression/multi/FunctionExpression.java     | 14 ++++++++++++++
 .../iotdb/db/query/expression/unary/UnaryExpression.java  | 12 ++++++++++++
 .../db/query/udf/core/layer/EvaluationDAGBuilder.java     | 12 ++++++++++++
 11 files changed, 100 insertions(+), 5 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index 7da3d8f34a..f9eafceb03 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class FilterOperator extends TransformOperator {
 
@@ -45,6 +47,7 @@ public class FilterOperator extends TransformOperator {
       OperatorContext operatorContext,
       Operator inputOperator,
       List<TSDataType> inputDataTypes,
+      Map<String, List<InputLocation>> inputLocations,
       Expression filterExpression,
       Expression[] outputExpressions,
       boolean keepNull,
@@ -55,6 +58,7 @@ public class FilterOperator extends TransformOperator {
         operatorContext,
         inputOperator,
         inputDataTypes,
+        inputLocations,
         bindExpressions(filterExpression, outputExpressions),
         keepNull,
         zoneId,
@@ -70,9 +74,10 @@ public class FilterOperator extends TransformOperator {
   }
 
   @Override
-  protected void initTransformers(TypeProvider typeProvider)
+  protected void initTransformers(
+      Map<String, List<InputLocation>> inputLocations, TypeProvider 
typeProvider)
       throws QueryProcessException, IOException {
-    super.initTransformers(typeProvider);
+    super.initTransformers(inputLocations, typeProvider);
 
     filterPointReader = transformers[transformers.length - 1];
     if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 1b96943039..7a99fae9bf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -46,6 +47,7 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class TransformOperator implements ProcessOperator {
 
@@ -75,6 +77,7 @@ public class TransformOperator implements ProcessOperator {
       OperatorContext operatorContext,
       Operator inputOperator,
       List<TSDataType> inputDataTypes,
+      Map<String, List<InputLocation>> inputLocations,
       Expression[] outputExpressions,
       boolean keepNull,
       ZoneId zoneId,
@@ -88,7 +91,7 @@ public class TransformOperator implements ProcessOperator {
 
     initInputLayer(inputDataTypes);
     initUdtfContext(zoneId);
-    initTransformers(typeProvider);
+    initTransformers(inputLocations, typeProvider);
     readyForFirstIteration();
   }
 
@@ -105,7 +108,8 @@ public class TransformOperator implements ProcessOperator {
     udtfContext.constructUdfExecutors(outputExpressions);
   }
 
-  protected void initTransformers(TypeProvider typeProvider)
+  protected void initTransformers(
+      Map<String, List<InputLocation>> inputLocations, TypeProvider 
typeProvider)
       throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     try {
@@ -116,6 +120,7 @@ public class TransformOperator implements ProcessOperator {
           new EvaluationDAGBuilder(
                   operatorContext.getOperatorId(),
                   inputLayer,
+                  inputLocations,
                   outputExpressions,
                   udtfContext,
                   typeProvider,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 3273638f04..492ab8f029 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -455,12 +455,14 @@ public class LocalExecutionPlanner {
               TransformNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
       final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+      final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
         return new TransformOperator(
             operatorContext,
             inputOperator,
             inputDataTypes,
+            inputLocations,
             node.getOutputExpressions(),
             node.isKeepNull(),
             node.getZoneId(),
@@ -477,12 +479,14 @@ public class LocalExecutionPlanner {
               context.getNextOperatorId(), node.getPlanNodeId(), 
FilterNode.class.getSimpleName());
       final Operator inputOperator = generateOnlyChildOperator(node, context);
       final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+      final Map<String, List<InputLocation>> inputLocations = makeLayout(node);
 
       try {
         return new FilterOperator(
             operatorContext,
             inputOperator,
             inputDataTypes,
+            inputLocations,
             node.getPredicate(),
             node.getOutputExpressions(),
             node.isKeepNull(),
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java 
b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 57857be4bb..28698ab133 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.binary.AdditionExpression;
 import org.apache.iotdb.db.query.expression.binary.DivisionExpression;
@@ -135,8 +136,13 @@ public abstract class Expression {
 
   protected Integer inputColumnIndex = null;
 
+  // TODO: remove after MPP finish
+  @Deprecated
   public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan 
udtfPlan);
 
+  public abstract void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations);
+
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner);
 
   // TODO: remove after MPP finish
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 34a9d11b1a..6dac04bf29 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
@@ -192,12 +193,24 @@ public abstract class BinaryExpression extends Expression 
{
   }
 
   @Override
-  public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) {
+  public final void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) 
{
     leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
     rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan);
     inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
   }
 
+  @Override
+  public final void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    leftExpression.bindInputLayerColumnIndexWithExpression(inputLocations);
+    rightExpression.bindInputLayerColumnIndexWithExpression(inputLocations);
+
+    final String digest = toString();
+    if (inputLocations.containsKey(digest)) {
+      inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
+    }
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
index 90533d653f..8a6c4589c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.leaf;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ExpressionType;
@@ -104,6 +105,12 @@ public class ConstantOperand extends LeafOperand {
     // Do nothing
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    // Do nothing
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     // Do nothing
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
index e8e6aa8cc2..cdab2e4adc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PathDeserializeUtil;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ExpressionType;
@@ -99,6 +100,15 @@ public class TimeSeriesOperand extends LeafOperand {
     inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    final String digest = toString();
+    if (inputLocations.containsKey(digest)) {
+      inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
+    }
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     memoryAssigner.increaseExpressionReference(this);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
index dbea065fe4..8c9f6c741d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ExpressionType;
@@ -87,6 +88,12 @@ public class TimestampOperand extends LeafOperand {
     // do nothing
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    // do nothing
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     memoryAssigner.increaseExpressionReference(this);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
index c63c08083e..caa87859e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer;
@@ -279,6 +280,19 @@ public class FunctionExpression extends Expression {
     inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
   }
 
+  @Override
+  public void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    for (Expression expression : expressions) {
+      expression.bindInputLayerColumnIndexWithExpression(inputLocations);
+    }
+
+    final String digest = toString();
+    if (inputLocations.containsKey(digest)) {
+      inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
+    }
+  }
+
   @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     for (Expression expression : expressions) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
index 249398fc6d..02ec49a4b1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -96,6 +97,17 @@ public abstract class UnaryExpression extends Expression {
     inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString());
   }
 
+  @Override
+  public final void bindInputLayerColumnIndexWithExpression(
+      Map<String, List<InputLocation>> inputLocations) {
+    expression.bindInputLayerColumnIndexWithExpression(inputLocations);
+
+    final String digest = toString();
+    if (inputLocations.containsKey(digest)) {
+      inputColumnIndex = 
inputLocations.get(digest).get(0).getValueColumnIndex();
+    }
+  }
+
   @Override
   public final void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index 92f52963d5..80479da2d1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class EvaluationDAGBuilder {
@@ -34,6 +36,7 @@ public class EvaluationDAGBuilder {
   private final long queryId;
 
   private final RawQueryInputLayer inputLayer;
+  private final Map<String, List<InputLocation>> inputLocations;
 
   private final Expression[] outputExpressions;
   private final LayerPointReader[] outputPointReaders;
@@ -52,12 +55,14 @@ public class EvaluationDAGBuilder {
   public EvaluationDAGBuilder(
       long queryId,
       RawQueryInputLayer inputLayer,
+      Map<String, List<InputLocation>> inputLocations,
       Expression[] outputExpressions,
       UDTFContext udtfContext,
       TypeProvider typeProvider,
       float memoryBudgetInMB) {
     this.queryId = queryId;
     this.inputLayer = inputLayer;
+    this.inputLocations = inputLocations;
     this.outputExpressions = outputExpressions;
     this.udtfContext = udtfContext;
     this.typeProvider = typeProvider;
@@ -78,6 +83,13 @@ public class EvaluationDAGBuilder {
     return this;
   }
 
+  public EvaluationDAGBuilder bindInputLayerColumnIndexWithExpression() {
+    for (Expression expression : outputExpressions) {
+      expression.bindInputLayerColumnIndexWithExpression(inputLocations);
+    }
+    return this;
+  }
+
   public EvaluationDAGBuilder buildResultColumnPointReaders()
       throws QueryProcessException, IOException {
     for (int i = 0; i < outputExpressions.length; ++i) {

Reply via email to