This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch trasnsform-operator-bugfix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 666ee799e867848c507023ee59f47f9c4505c9d9 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 16 12:15:01 2022 +0800 bind input types --- .../mpp/execution/operator/process/FilterOperator.java | 6 ++++-- .../execution/operator/process/TransformOperator.java | 17 ++++++++--------- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 12 ++++++++++-- .../db/query/udf/core/layer/EvaluationDAGBuilder.java | 7 ++++--- 4 files changed, 26 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java index 54023374cf..8522d065cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java @@ -75,9 +75,11 @@ public class FilterOperator extends TransformOperator { @Override protected void initTransformers( - Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider) + Map<String, List<InputLocation>> inputLocations, + Expression[] outputExpressions, + TypeProvider typeProvider) throws QueryProcessException, IOException { - super.initTransformers(inputLocations, typeProvider); + super.initTransformers(inputLocations, outputExpressions, typeProvider); filterPointReader = transformers[transformers.length - 1]; if (filterPointReader.getDataType() != TSDataType.BOOLEAN) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java index de3cedcc3c..784573cd1b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java @@ -63,8 +63,6 @@ public class TransformOperator implements ProcessOperator { protected final OperatorContext operatorContext; protected final Operator inputOperator; - protected final List<TSDataType> inputDataTypes; - protected final Expression[] outputExpressions; protected final boolean keepNull; protected boolean isFirstIteration; @@ -87,15 +85,13 @@ public class TransformOperator implements ProcessOperator { throws QueryProcessException, IOException { this.operatorContext = operatorContext; this.inputOperator = inputOperator; - this.inputDataTypes = inputDataTypes; - this.outputExpressions = outputExpressions; this.keepNull = keepNull; isFirstIteration = true; initInputLayer(inputDataTypes); - initUdtfContext(zoneId); - initTransformers(inputLocations, typeProvider); + initUdtfContext(outputExpressions, zoneId); + initTransformers(inputLocations, outputExpressions, typeProvider); } private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException { @@ -106,13 +102,15 @@ public class TransformOperator implements ProcessOperator { new TsBlockInputDataSet(inputOperator, inputDataTypes)); } - private void initUdtfContext(ZoneId zoneId) { + private void initUdtfContext(Expression[] outputExpressions, ZoneId zoneId) { udtfContext = new UDTFContext(zoneId); udtfContext.constructUdfExecutors(outputExpressions); } protected void initTransformers( - Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider) + Map<String, List<InputLocation>> inputLocations, + Expression[] outputExpressions, + TypeProvider typeProvider) throws QueryProcessException, IOException { UDFRegistrationService.getInstance().acquireRegistrationLock(); try { @@ -125,10 +123,11 @@ public class TransformOperator implements ProcessOperator { inputLayer, inputLocations, outputExpressions, - udtfContext, typeProvider, + udtfContext, udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB) .buildLayerMemoryAssigner() + .bindInputLayerColumnIndexWithExpression() .buildResultColumnPointReaders() .getOutputPointReaders(); } finally { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 492ab8f029..6a8c5956e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -454,7 +454,7 @@ public class LocalExecutionPlanner { node.getPlanNodeId(), TransformNode.class.getSimpleName()); final Operator inputOperator = generateOnlyChildOperator(node, context); - final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider()); final Map<String, List<InputLocation>> inputLocations = makeLayout(node); try { @@ -478,7 +478,7 @@ public class LocalExecutionPlanner { context.instanceContext.addOperatorContext( context.getNextOperatorId(), node.getPlanNodeId(), FilterNode.class.getSimpleName()); final Operator inputOperator = generateOnlyChildOperator(node, context); - final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + final List<TSDataType> inputDataTypes = getInputColumnTypes(node, context.getTypeProvider()); final Map<String, List<InputLocation>> inputLocations = makeLayout(node); try { @@ -680,6 +680,14 @@ public class LocalExecutionPlanner { return outputMappings; } + private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider typeProvider) { + return node.getChildren().stream() + .map(PlanNode::getOutputColumnNames) + .flatMap(List::stream) + .map(typeProvider::getType) + .collect(Collectors.toList()); + } + private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider typeProvider) { return node.getOutputColumnNames().stream() .map(typeProvider::getType) diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java index 80479da2d1..003447f0e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java @@ -41,6 +41,8 @@ public class EvaluationDAGBuilder { private final Expression[] outputExpressions; private final LayerPointReader[] outputPointReaders; + private final TypeProvider typeProvider; + private final UDTFContext udtfContext; private final LayerMemoryAssigner memoryAssigner; @@ -50,22 +52,21 @@ public class EvaluationDAGBuilder { // sub-expressions, but they can share the same point reader. we cache the point reader here to // make sure that only one point reader will be built for one expression. private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap; - private final TypeProvider typeProvider; public EvaluationDAGBuilder( long queryId, RawQueryInputLayer inputLayer, Map<String, List<InputLocation>> inputLocations, Expression[] outputExpressions, - UDTFContext udtfContext, TypeProvider typeProvider, + UDTFContext udtfContext, float memoryBudgetInMB) { this.queryId = queryId; this.inputLayer = inputLayer; this.inputLocations = inputLocations; this.outputExpressions = outputExpressions; - this.udtfContext = udtfContext; this.typeProvider = typeProvider; + this.udtfContext = udtfContext; int size = inputLayer.getInputColumnCount(); outputPointReaders = new LayerPointReader[size];
