This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch trasnsform-operator-bugfix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 279f96667d85240f0d16df4e9be92bf9e5c7da9d
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon May 16 11:36:20 2022 +0800

    reused TypeProvider in converting Expression->Transformer stage
---
 .../execution/operator/process/FilterOperator.java |   5 +-
 .../operator/process/TransformOperator.java        |  13 +--
 .../iotdb/db/query/expression/Expression.java      |  11 +++
 .../query/expression/binary/BinaryExpression.java  |  49 ++++++++++
 .../db/query/expression/leaf/ConstantOperand.java  |  19 ++++
 .../query/expression/leaf/TimeSeriesOperand.java   |  28 ++++++
 .../db/query/expression/leaf/TimestampOperand.java |  26 ++++++
 .../query/expression/multi/FunctionExpression.java | 101 +++++++++++++++++++++
 .../db/query/expression/unary/UnaryExpression.java |  38 ++++++++
 .../db/query/udf/core/executor/UDTFExecutor.java   |  27 ++++++
 .../query/udf/core/layer/EvaluationDAGBuilder.java |   9 +-
 11 files changed, 311 insertions(+), 15 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index 79e5e2fd2f..7da3d8f34a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -70,8 +70,9 @@ public class FilterOperator extends TransformOperator {
   }
 
   @Override
-  protected void initTransformers() throws QueryProcessException, IOException {
-    super.initTransformers();
+  protected void initTransformers(TypeProvider typeProvider)
+      throws QueryProcessException, IOException {
+    super.initTransformers(typeProvider);
 
     filterPointReader = transformers[transformers.length - 1];
     if (filterPointReader.getDataType() != TSDataType.BOOLEAN) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 7d47611f6d..1b96943039 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -88,9 +88,8 @@ public class TransformOperator implements ProcessOperator {
 
     initInputLayer(inputDataTypes);
     initUdtfContext(zoneId);
-    initTransformers();
+    initTransformers(typeProvider);
     readyForFirstIteration();
-    updateTypeProvider(typeProvider);
   }
 
   private void initInputLayer(List<TSDataType> inputDataTypes) throws 
QueryProcessException {
@@ -106,7 +105,8 @@ public class TransformOperator implements ProcessOperator {
     udtfContext.constructUdfExecutors(outputExpressions);
   }
 
-  protected void initTransformers() throws QueryProcessException, IOException {
+  protected void initTransformers(TypeProvider typeProvider)
+      throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     try {
       // This statement must be surrounded by the registration lock.
@@ -118,6 +118,7 @@ public class TransformOperator implements ProcessOperator {
                   inputLayer,
                   outputExpressions,
                   udtfContext,
+                  typeProvider,
                   udfTransformerMemoryBudgetInMB + 
udfCollectorMemoryBudgetInMB)
               .buildLayerMemoryAssigner()
               .buildResultColumnPointReaders()
@@ -149,12 +150,6 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  private void updateTypeProvider(TypeProvider typeProvider) {
-    for (int i = 0; i < transformers.length; ++i) {
-      typeProvider.setType(outputExpressions[i].toString(), 
transformers[i].getDataType());
-    }
-  }
-
   @Override
   public boolean hasNext() {
     return !timeHeap.isEmpty();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java 
b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 4dceb6106e..57857be4bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -139,6 +139,8 @@ public abstract class Expression {
 
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner);
 
+  // TODO: remove after MPP finish
+  @Deprecated
   public abstract IntermediateLayer constructIntermediateLayer(
       long queryId,
       UDTFContext udtfContext,
@@ -148,6 +150,15 @@ public abstract class Expression {
       LayerMemoryAssigner memoryAssigner)
       throws QueryProcessException, IOException;
 
+  public abstract IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException;
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // isConstantOperand
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index d9a5836d9b..34a9d11b1a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.binary;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
@@ -253,6 +254,54 @@ public abstract class BinaryExpression extends Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      IntermediateLayer leftParentIntermediateLayer =
+          leftExpression.constructIntermediateLayer(
+              queryId,
+              udtfContext,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              typeProvider,
+              memoryAssigner);
+      IntermediateLayer rightParentIntermediateLayer =
+          rightExpression.constructIntermediateLayer(
+              queryId,
+              udtfContext,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              typeProvider,
+              memoryAssigner);
+      Transformer transformer =
+          constructTransformer(
+              leftParentIntermediateLayer.constructPointReader(),
+              rightParentIntermediateLayer.constructPointReader());
+
+      // SingleInputColumnMultiReferenceIntermediateLayer doesn't support 
ConstantLayerPointReader
+      // yet. And since a ConstantLayerPointReader won't produce too much IO,
+      // SingleInputColumnSingleReferenceIntermediateLayer could be a better 
choice.
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1 || isConstantOperand()
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer));
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   protected abstract BinaryTransformer constructTransformer(
       LayerPointReader leftParentLayerPointReader, LayerPointReader 
rightParentLayerPointReader);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
index 7acd877fc0..90533d653f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.commons.lang3.Validate;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -127,6 +128,24 @@ public class ConstantOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      IntermediateLayer intermediateLayer =
+          new ConstantIntermediateLayer(this, queryId, 
memoryAssigner.assign());
+      expressionIntermediateLayerMap.put(this, intermediateLayer);
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   @Override
   public String getExpressionStringInternal() {
     return valueString;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
index 5d896a932f..e8e6aa8cc2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
@@ -131,6 +132,33 @@ public class TimeSeriesOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      LayerPointReader parentLayerPointReader =
+          rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex);
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, parentLayerPointReader)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, parentLayerPointReader));
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   @Override
   public String getExpressionStringInternal() {
     return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : 
path.getFullPath();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
index bb473451b7..dbea065fe4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java
@@ -119,6 +119,32 @@ public class TimestampOperand extends LeafOperand {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      LayerPointReader parentLayerPointReader = 
rawTimeSeriesInputLayer.constructTimePointReader();
+
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, parentLayerPointReader)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, parentLayerPointReader));
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   @Override
   protected boolean isConstantOperandInternal() {
     return false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
index 0ec226403f..c63c08083e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java
@@ -287,6 +287,103 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      float memoryBudgetInMB = memoryAssigner.assign();
+      Transformer transformer;
+      if (isBuiltInAggregationFunctionExpression) {
+        transformer =
+            new TransparentTransformer(
+                
rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex));
+      } else {
+        IntermediateLayer udfInputIntermediateLayer =
+            constructUdfInputIntermediateLayer(
+                queryId,
+                udtfContext,
+                rawTimeSeriesInputLayer,
+                expressionIntermediateLayerMap,
+                typeProvider,
+                memoryAssigner);
+        transformer =
+            constructUdfTransformer(
+                queryId, udtfContext, typeProvider, memoryAssigner, 
udfInputIntermediateLayer);
+      }
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer));
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
+  private IntermediateLayer constructUdfInputIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    List<IntermediateLayer> intermediateLayers = new ArrayList<>();
+    for (Expression expression : expressions) {
+      intermediateLayers.add(
+          expression.constructIntermediateLayer(
+              queryId,
+              udtfContext,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              typeProvider,
+              memoryAssigner));
+    }
+    return intermediateLayers.size() == 1
+        ? intermediateLayers.get(0)
+        : new MultiInputColumnIntermediateLayer(
+            this,
+            queryId,
+            memoryAssigner.assign(),
+            intermediateLayers.stream()
+                .map(IntermediateLayer::constructPointReader)
+                .collect(Collectors.toList()));
+  }
+
+  private UDFQueryTransformer constructUdfTransformer(
+      long queryId,
+      UDTFContext udtfContext,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner,
+      IntermediateLayer udfInputIntermediateLayer)
+      throws QueryProcessException, IOException {
+    UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this);
+
+    executor.beforeStart(queryId, memoryAssigner.assign(), typeProvider);
+
+    AccessStrategy accessStrategy = 
executor.getConfigurations().getAccessStrategy();
+    switch (accessStrategy.getAccessStrategyType()) {
+      case ROW_BY_ROW:
+        return new 
UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), 
executor);
+      case SLIDING_SIZE_WINDOW:
+      case SLIDING_TIME_WINDOW:
+        return new UDFQueryRowWindowTransformer(
+            udfInputIntermediateLayer.constructRowWindowReader(
+                accessStrategy, memoryAssigner.assign()),
+            executor);
+      default:
+        throw new UnsupportedOperationException("Unsupported transformer 
access strategy");
+    }
+  }
+
   @Override
   public IntermediateLayer constructIntermediateLayer(
       long queryId,
@@ -333,6 +430,8 @@ public class FunctionExpression extends Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  // TODO: remove it after MPP finished
+  @Deprecated
   private IntermediateLayer constructUdfInputIntermediateLayer(
       long queryId,
       UDTFContext udtfContext,
@@ -363,6 +462,8 @@ public class FunctionExpression extends Expression {
                 .collect(Collectors.toList()));
   }
 
+  // TODO: remove it after MPP finished
+  @Deprecated
   private UDFQueryTransformer constructUdfTransformer(
       long queryId,
       UDTFContext udtfContext,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
index f82d980e7c..249398fc6d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.unary;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.query.LogicalOptimizeException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -139,6 +140,43 @@ public abstract class UnaryExpression extends Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  @Override
+  public IntermediateLayer constructIntermediateLayer(
+      long queryId,
+      UDTFContext udtfContext,
+      RawQueryInputLayer rawTimeSeriesInputLayer,
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap,
+      TypeProvider typeProvider,
+      LayerMemoryAssigner memoryAssigner)
+      throws QueryProcessException, IOException {
+    if (!expressionIntermediateLayerMap.containsKey(this)) {
+      float memoryBudgetInMB = memoryAssigner.assign();
+
+      IntermediateLayer parentLayerPointReader =
+          expression.constructIntermediateLayer(
+              queryId,
+              udtfContext,
+              rawTimeSeriesInputLayer,
+              expressionIntermediateLayerMap,
+              typeProvider,
+              memoryAssigner);
+      Transformer transformer = 
constructTransformer(parentLayerPointReader.constructPointReader());
+
+      // SingleInputColumnMultiReferenceIntermediateLayer doesn't support 
ConstantLayerPointReader
+      // yet. And since a ConstantLayerPointReader won't produce too much IO,
+      // SingleInputColumnSingleReferenceIntermediateLayer could be a better 
choice.
+      expressionIntermediateLayerMap.put(
+          this,
+          memoryAssigner.getReference(this) == 1 || isConstantOperand()
+              ? new SingleInputColumnSingleReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer)
+              : new SingleInputColumnMultiReferenceIntermediateLayer(
+                  this, queryId, memoryBudgetInMB, transformer));
+    }
+
+    return expressionIntermediateLayerMap.get(this);
+  }
+
   protected abstract Transformer constructTransformer(LayerPointReader 
pointReader);
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
index 83cb5990a0..b8f697f6b7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.query.udf.core.executor;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.query.udf.api.UDTF;
@@ -53,6 +54,32 @@ public class UDTFExecutor {
     configurations = new UDTFConfigurations(zoneId);
   }
 
+  public void beforeStart(long queryId, float collectorMemoryBudgetInMB, 
TypeProvider typeProvider)
+      throws QueryProcessException {
+    udtf = (UDTF) UDFRegistrationService.getInstance().reflect(expression);
+
+    UDFParameters parameters = new UDFParameters(expression, typeProvider);
+
+    try {
+      udtf.validate(new UDFParameterValidator(parameters));
+    } catch (Exception e) {
+      onError("validate(UDFParameterValidator)", e);
+    }
+
+    try {
+      udtf.beforeStart(parameters, configurations);
+    } catch (Exception e) {
+      onError("beforeStart(UDFParameters, UDTFConfigurations)", e);
+    }
+    configurations.check();
+
+    collector =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            configurations.getOutputDataType(), queryId, 
collectorMemoryBudgetInMB, 1);
+  }
+
+  // TODO: remove it after MPP finished
+  @Deprecated
   public void beforeStart(
       long queryId,
       float collectorMemoryBudgetInMB,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
index debc89d250..92f52963d5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java
@@ -20,10 +20,10 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -47,18 +47,20 @@ public class EvaluationDAGBuilder {
   // sub-expressions, but they can share the same point reader. we cache the 
point reader here to
   // make sure that only one point reader will be built for one expression.
   private final Map<Expression, IntermediateLayer> 
expressionIntermediateLayerMap;
-  private final Map<Expression, TSDataType> expressionDataTypeMap;
+  private final TypeProvider typeProvider;
 
   public EvaluationDAGBuilder(
       long queryId,
       RawQueryInputLayer inputLayer,
       Expression[] outputExpressions,
       UDTFContext udtfContext,
+      TypeProvider typeProvider,
       float memoryBudgetInMB) {
     this.queryId = queryId;
     this.inputLayer = inputLayer;
     this.outputExpressions = outputExpressions;
     this.udtfContext = udtfContext;
+    this.typeProvider = typeProvider;
 
     int size = inputLayer.getInputColumnCount();
     outputPointReaders = new LayerPointReader[size];
@@ -66,7 +68,6 @@ public class EvaluationDAGBuilder {
     memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB);
 
     expressionIntermediateLayerMap = new HashMap<>();
-    expressionDataTypeMap = new HashMap<>();
   }
 
   public EvaluationDAGBuilder buildLayerMemoryAssigner() {
@@ -87,7 +88,7 @@ public class EvaluationDAGBuilder {
                   udtfContext,
                   inputLayer,
                   expressionIntermediateLayerMap,
-                  expressionDataTypeMap,
+                  typeProvider,
                   memoryAssigner)
               .constructPointReader();
     }

Reply via email to