This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch trasnsform-operator-bugfix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a3e4512f48a28cc6ededb13933ff6aa5b30cdf4 Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 16 11:53:21 2022 +0800 bind transformer input column with operator column input index --- .../db/mpp/execution/operator/process/FilterOperator.java | 9 +++++++-- .../mpp/execution/operator/process/TransformOperator.java | 9 +++++++-- .../iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java | 4 ++++ .../org/apache/iotdb/db/query/expression/Expression.java | 6 ++++++ .../db/query/expression/binary/BinaryExpression.java | 15 ++++++++++++++- .../iotdb/db/query/expression/leaf/ConstantOperand.java | 7 +++++++ .../iotdb/db/query/expression/leaf/TimeSeriesOperand.java | 10 ++++++++++ .../iotdb/db/query/expression/leaf/TimestampOperand.java | 7 +++++++ .../db/query/expression/multi/FunctionExpression.java | 14 ++++++++++++++ .../iotdb/db/query/expression/unary/UnaryExpression.java | 12 ++++++++++++ .../db/query/udf/core/layer/EvaluationDAGBuilder.java | 12 ++++++++++++ 11 files changed, 100 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java index 7da3d8f34a..f9eafceb03 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; @@ -36,6 +37,7 @@ import java.io.IOException; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class FilterOperator extends TransformOperator { @@ -45,6 +47,7 @@ public class FilterOperator extends TransformOperator { OperatorContext operatorContext, Operator inputOperator, List<TSDataType> inputDataTypes, + Map<String, List<InputLocation>> inputLocations, Expression filterExpression, Expression[] outputExpressions, boolean keepNull, @@ -55,6 +58,7 @@ public class FilterOperator extends TransformOperator { operatorContext, inputOperator, inputDataTypes, + inputLocations, bindExpressions(filterExpression, outputExpressions), keepNull, zoneId, @@ -70,9 +74,10 @@ public class FilterOperator extends TransformOperator { } @Override - protected void initTransformers(TypeProvider typeProvider) + protected void initTransformers( + Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider) throws QueryProcessException, IOException { - super.initTransformers(typeProvider); + super.initTransformers(inputLocations, typeProvider); filterPointReader = transformers[transformers.length - 1]; if (filterPointReader.getDataType() != TSDataType.BOOLEAN) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java index 1b96943039..7a99fae9bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder; @@ -46,6 +47,7 @@ import java.io.IOException; import java.time.ZoneId; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class TransformOperator implements ProcessOperator { @@ -75,6 +77,7 @@ public class TransformOperator implements ProcessOperator { OperatorContext operatorContext, Operator inputOperator, List<TSDataType> inputDataTypes, + Map<String, List<InputLocation>> inputLocations, Expression[] outputExpressions, boolean keepNull, ZoneId zoneId, @@ -88,7 +91,7 @@ public class TransformOperator implements ProcessOperator { initInputLayer(inputDataTypes); initUdtfContext(zoneId); - initTransformers(typeProvider); + initTransformers(inputLocations, typeProvider); readyForFirstIteration(); } @@ -105,7 +108,8 @@ public class TransformOperator implements ProcessOperator { udtfContext.constructUdfExecutors(outputExpressions); } - protected void initTransformers(TypeProvider typeProvider) + protected void initTransformers( + Map<String, List<InputLocation>> inputLocations, TypeProvider typeProvider) throws QueryProcessException, IOException { UDFRegistrationService.getInstance().acquireRegistrationLock(); try { @@ -116,6 +120,7 @@ public class TransformOperator implements ProcessOperator { new EvaluationDAGBuilder( operatorContext.getOperatorId(), inputLayer, + inputLocations, outputExpressions, udtfContext, typeProvider, diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 3273638f04..492ab8f029 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -455,12 +455,14 @@ public class LocalExecutionPlanner { TransformNode.class.getSimpleName()); final Operator inputOperator = generateOnlyChildOperator(node, context); final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + final Map<String, List<InputLocation>> inputLocations = makeLayout(node); try { return new TransformOperator( operatorContext, inputOperator, inputDataTypes, + inputLocations, node.getOutputExpressions(), node.isKeepNull(), node.getZoneId(), @@ -477,12 +479,14 @@ public class LocalExecutionPlanner { context.getNextOperatorId(), node.getPlanNodeId(), FilterNode.class.getSimpleName()); final Operator inputOperator = generateOnlyChildOperator(node, context); final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + final Map<String, List<InputLocation>> inputLocations = makeLayout(node); try { return new FilterOperator( operatorContext, inputOperator, inputDataTypes, + inputLocations, node.getPredicate(), node.getOutputExpressions(), node.isKeepNull(), diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java index 57857be4bb..28698ab133 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.binary.AdditionExpression; import org.apache.iotdb.db.query.expression.binary.DivisionExpression; @@ -135,8 +136,13 @@ public abstract class Expression { protected Integer inputColumnIndex = null; + // TODO: remove after MPP finish + @Deprecated public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan); + public abstract void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations); + public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner); // TODO: remove after MPP finish diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java index 34a9d11b1a..6dac04bf29 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; @@ -192,12 +193,24 @@ public abstract class BinaryExpression extends Expression { } @Override - public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + public final void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan); rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan); inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); } + @Override + public final void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + leftExpression.bindInputLayerColumnIndexWithExpression(inputLocations); + rightExpression.bindInputLayerColumnIndexWithExpression(inputLocations); + + final String digest = toString(); + if (inputLocations.containsKey(digest)) { + inputColumnIndex = inputLocations.get(digest).get(0).getValueColumnIndex(); + } + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java index 90533d653f..8a6c4589c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.leaf; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.ExpressionType; @@ -104,6 +105,12 @@ public class ConstantOperand extends LeafOperand { // Do nothing } + @Override + public void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + // Do nothing + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { // Do nothing diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java index e8e6aa8cc2..cdab2e4adc 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.path.PathDeserializeUtil; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.ExpressionType; @@ -99,6 +100,15 @@ public class TimeSeriesOperand extends LeafOperand { inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); } + @Override + public void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + final String digest = toString(); + if (inputLocations.containsKey(digest)) { + inputColumnIndex = inputLocations.get(digest).get(0).getValueColumnIndex(); + } + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { memoryAssigner.increaseExpressionReference(this); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java index dbea065fe4..8c9f6c741d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.ExpressionType; @@ -87,6 +88,12 @@ public class TimestampOperand extends LeafOperand { // do nothing } + @Override + public void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + // do nothing + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { memoryAssigner.increaseExpressionReference(this); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java index c63c08083e..caa87859e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer; @@ -279,6 +280,19 @@ public class FunctionExpression extends Expression { inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); } + @Override + public void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + for (Expression expression : expressions) { + expression.bindInputLayerColumnIndexWithExpression(inputLocations); + } + + final String digest = toString(); + if (inputLocations.containsKey(digest)) { + inputColumnIndex = inputLocations.get(digest).get(0).getValueColumnIndex(); + } + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { for (Expression expression : expressions) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java index 249398fc6d..02ec49a4b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.utils.WildcardsRemover; import org.apache.iotdb.db.query.expression.Expression; @@ -96,6 +97,17 @@ public abstract class UnaryExpression extends Expression { inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); } + @Override + public final void bindInputLayerColumnIndexWithExpression( + Map<String, List<InputLocation>> inputLocations) { + expression.bindInputLayerColumnIndexWithExpression(inputLocations); + + final String digest = toString(); + if (inputLocations.containsKey(digest)) { + inputColumnIndex = inputLocations.get(digest).get(0).getValueColumnIndex(); + } + } + @Override public final void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { expression.updateStatisticsForMemoryAssigner(memoryAssigner); diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java index 92f52963d5..80479da2d1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java @@ -21,12 +21,14 @@ package org.apache.iotdb.db.query.udf.core.layer; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; public class EvaluationDAGBuilder { @@ -34,6 +36,7 @@ public class EvaluationDAGBuilder { private final long queryId; private final RawQueryInputLayer inputLayer; + private final Map<String, List<InputLocation>> inputLocations; private final Expression[] outputExpressions; private final LayerPointReader[] outputPointReaders; @@ -52,12 +55,14 @@ public class EvaluationDAGBuilder { public EvaluationDAGBuilder( long queryId, RawQueryInputLayer inputLayer, + Map<String, List<InputLocation>> inputLocations, Expression[] outputExpressions, UDTFContext udtfContext, TypeProvider typeProvider, float memoryBudgetInMB) { this.queryId = queryId; this.inputLayer = inputLayer; + this.inputLocations = inputLocations; this.outputExpressions = outputExpressions; this.udtfContext = udtfContext; this.typeProvider = typeProvider; @@ -78,6 +83,13 @@ public class EvaluationDAGBuilder { return this; } + public EvaluationDAGBuilder bindInputLayerColumnIndexWithExpression() { + for (Expression expression : outputExpressions) { + expression.bindInputLayerColumnIndexWithExpression(inputLocations); + } + return this; + } + public EvaluationDAGBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException { for (int i = 0; i < outputExpressions.length; ++i) {
