This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch trasnsform-operator-bugfix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 279f96667d85240f0d16df4e9be92bf9e5c7da9d Author: Steve Yurong Su <[email protected]> AuthorDate: Mon May 16 11:36:20 2022 +0800 reused TypeProvider in converting Expression->Transformer stage --- .../execution/operator/process/FilterOperator.java | 5 +- .../operator/process/TransformOperator.java | 13 +-- .../iotdb/db/query/expression/Expression.java | 11 +++ .../query/expression/binary/BinaryExpression.java | 49 ++++++++++ .../db/query/expression/leaf/ConstantOperand.java | 19 ++++ .../query/expression/leaf/TimeSeriesOperand.java | 28 ++++++ .../db/query/expression/leaf/TimestampOperand.java | 26 ++++++ .../query/expression/multi/FunctionExpression.java | 101 +++++++++++++++++++++ .../db/query/expression/unary/UnaryExpression.java | 38 ++++++++ .../db/query/udf/core/executor/UDTFExecutor.java | 27 ++++++ .../query/udf/core/layer/EvaluationDAGBuilder.java | 9 +- 11 files changed, 311 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java index 79e5e2fd2f..7da3d8f34a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java @@ -70,8 +70,9 @@ public class FilterOperator extends TransformOperator { } @Override - protected void initTransformers() throws QueryProcessException, IOException { - super.initTransformers(); + protected void initTransformers(TypeProvider typeProvider) + throws QueryProcessException, IOException { + super.initTransformers(typeProvider); filterPointReader = transformers[transformers.length - 1]; if (filterPointReader.getDataType() != TSDataType.BOOLEAN) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java index 7d47611f6d..1b96943039 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java @@ -88,9 +88,8 @@ public class TransformOperator implements ProcessOperator { initInputLayer(inputDataTypes); initUdtfContext(zoneId); - initTransformers(); + initTransformers(typeProvider); readyForFirstIteration(); - updateTypeProvider(typeProvider); } private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException { @@ -106,7 +105,8 @@ public class TransformOperator implements ProcessOperator { udtfContext.constructUdfExecutors(outputExpressions); } - protected void initTransformers() throws QueryProcessException, IOException { + protected void initTransformers(TypeProvider typeProvider) + throws QueryProcessException, IOException { UDFRegistrationService.getInstance().acquireRegistrationLock(); try { // This statement must be surrounded by the registration lock. @@ -118,6 +118,7 @@ public class TransformOperator implements ProcessOperator { inputLayer, outputExpressions, udtfContext, + typeProvider, udfTransformerMemoryBudgetInMB + udfCollectorMemoryBudgetInMB) .buildLayerMemoryAssigner() .buildResultColumnPointReaders() @@ -149,12 +150,6 @@ public class TransformOperator implements ProcessOperator { } } - private void updateTypeProvider(TypeProvider typeProvider) { - for (int i = 0; i < transformers.length; ++i) { - typeProvider.setType(outputExpressions[i].toString(), transformers[i].getDataType()); - } - } - @Override public boolean hasNext() { return !timeHeap.isEmpty(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java index 4dceb6106e..57857be4bb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java @@ -139,6 +139,8 @@ public abstract class Expression { public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner); + // TODO: remove after MPP finish + @Deprecated public abstract IntermediateLayer constructIntermediateLayer( long queryId, UDTFContext udtfContext, @@ -148,6 +150,15 @@ public abstract class Expression { LayerMemoryAssigner memoryAssigner) throws QueryProcessException, IOException; + public abstract IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException; + ///////////////////////////////////////////////////////////////////////////////////////////////// // isConstantOperand ///////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java index d9a5836d9b..34a9d11b1a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.binary; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; @@ -253,6 +254,54 @@ public abstract class BinaryExpression extends Expression { return expressionIntermediateLayerMap.get(this); } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + float memoryBudgetInMB = memoryAssigner.assign(); + + IntermediateLayer leftParentIntermediateLayer = + leftExpression.constructIntermediateLayer( + queryId, + udtfContext, + rawTimeSeriesInputLayer, + expressionIntermediateLayerMap, + typeProvider, + memoryAssigner); + IntermediateLayer rightParentIntermediateLayer = + rightExpression.constructIntermediateLayer( + queryId, + udtfContext, + rawTimeSeriesInputLayer, + expressionIntermediateLayerMap, + typeProvider, + memoryAssigner); + Transformer transformer = + constructTransformer( + leftParentIntermediateLayer.constructPointReader(), + rightParentIntermediateLayer.constructPointReader()); + + // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader + // yet. And since a ConstantLayerPointReader won't produce too much IO, + // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice. + expressionIntermediateLayerMap.put( + this, + memoryAssigner.getReference(this) == 1 || isConstantOperand() + ? new SingleInputColumnSingleReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer) + : new SingleInputColumnMultiReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer)); + } + + return expressionIntermediateLayerMap.get(this); + } + protected abstract BinaryTransformer constructTransformer( LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java index 7acd877fc0..90533d653f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/ConstantOperand.java @@ -35,6 +35,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.commons.lang3.Validate; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -127,6 +128,24 @@ public class ConstantOperand extends LeafOperand { return expressionIntermediateLayerMap.get(this); } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + IntermediateLayer intermediateLayer = + new ConstantIntermediateLayer(this, queryId, memoryAssigner.assign()); + expressionIntermediateLayerMap.put(this, intermediateLayer); + } + + return expressionIntermediateLayerMap.get(this); + } + @Override public String getExpressionStringInternal() { return valueString; diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java index 5d896a932f..e8e6aa8cc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimeSeriesOperand.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.query.udf.core.layer.SingleInputColumnSingleReference import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; @@ -131,6 +132,33 @@ public class TimeSeriesOperand extends LeafOperand { return expressionIntermediateLayerMap.get(this); } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + float memoryBudgetInMB = memoryAssigner.assign(); + + LayerPointReader parentLayerPointReader = + rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex); + + expressionIntermediateLayerMap.put( + this, + memoryAssigner.getReference(this) == 1 + ? new SingleInputColumnSingleReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, parentLayerPointReader) + : new SingleInputColumnMultiReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, parentLayerPointReader)); + } + + return expressionIntermediateLayerMap.get(this); + } + @Override public String getExpressionStringInternal() { return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java index bb473451b7..dbea065fe4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/leaf/TimestampOperand.java @@ -119,6 +119,32 @@ public class TimestampOperand extends LeafOperand { return expressionIntermediateLayerMap.get(this); } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + float memoryBudgetInMB = memoryAssigner.assign(); + + LayerPointReader parentLayerPointReader = rawTimeSeriesInputLayer.constructTimePointReader(); + + expressionIntermediateLayerMap.put( + this, + memoryAssigner.getReference(this) == 1 + ? new SingleInputColumnSingleReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, parentLayerPointReader) + : new SingleInputColumnMultiReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, parentLayerPointReader)); + } + + return expressionIntermediateLayerMap.get(this); + } + @Override protected boolean isConstantOperandInternal() { return false; diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java index 0ec226403f..c63c08083e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/multi/FunctionExpression.java @@ -287,6 +287,103 @@ public class FunctionExpression extends Expression { } } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + float memoryBudgetInMB = memoryAssigner.assign(); + Transformer transformer; + if (isBuiltInAggregationFunctionExpression) { + transformer = + new TransparentTransformer( + rawTimeSeriesInputLayer.constructValuePointReader(inputColumnIndex)); + } else { + IntermediateLayer udfInputIntermediateLayer = + constructUdfInputIntermediateLayer( + queryId, + udtfContext, + rawTimeSeriesInputLayer, + expressionIntermediateLayerMap, + typeProvider, + memoryAssigner); + transformer = + constructUdfTransformer( + queryId, udtfContext, typeProvider, memoryAssigner, udfInputIntermediateLayer); + } + expressionIntermediateLayerMap.put( + this, + memoryAssigner.getReference(this) == 1 + ? new SingleInputColumnSingleReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer) + : new SingleInputColumnMultiReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer)); + } + + return expressionIntermediateLayerMap.get(this); + } + + private IntermediateLayer constructUdfInputIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + List<IntermediateLayer> intermediateLayers = new ArrayList<>(); + for (Expression expression : expressions) { + intermediateLayers.add( + expression.constructIntermediateLayer( + queryId, + udtfContext, + rawTimeSeriesInputLayer, + expressionIntermediateLayerMap, + typeProvider, + memoryAssigner)); + } + return intermediateLayers.size() == 1 + ? intermediateLayers.get(0) + : new MultiInputColumnIntermediateLayer( + this, + queryId, + memoryAssigner.assign(), + intermediateLayers.stream() + .map(IntermediateLayer::constructPointReader) + .collect(Collectors.toList())); + } + + private UDFQueryTransformer constructUdfTransformer( + long queryId, + UDTFContext udtfContext, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner, + IntermediateLayer udfInputIntermediateLayer) + throws QueryProcessException, IOException { + UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this); + + executor.beforeStart(queryId, memoryAssigner.assign(), typeProvider); + + AccessStrategy accessStrategy = executor.getConfigurations().getAccessStrategy(); + switch (accessStrategy.getAccessStrategyType()) { + case ROW_BY_ROW: + return new UDFQueryRowTransformer(udfInputIntermediateLayer.constructRowReader(), executor); + case SLIDING_SIZE_WINDOW: + case SLIDING_TIME_WINDOW: + return new UDFQueryRowWindowTransformer( + udfInputIntermediateLayer.constructRowWindowReader( + accessStrategy, memoryAssigner.assign()), + executor); + default: + throw new UnsupportedOperationException("Unsupported transformer access strategy"); + } + } + @Override public IntermediateLayer constructIntermediateLayer( long queryId, @@ -333,6 +430,8 @@ public class FunctionExpression extends Expression { return expressionIntermediateLayerMap.get(this); } + // TODO: remove it after MPP finished + @Deprecated private IntermediateLayer constructUdfInputIntermediateLayer( long queryId, UDTFContext udtfContext, @@ -363,6 +462,8 @@ public class FunctionExpression extends Expression { .collect(Collectors.toList())); } + // TODO: remove it after MPP finished + @Deprecated private UDFQueryTransformer constructUdfTransformer( long queryId, UDTFContext udtfContext, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java index f82d980e7c..249398fc6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/UnaryExpression.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.unary; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.query.LogicalOptimizeException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.utils.WildcardsRemover; import org.apache.iotdb.db.query.expression.Expression; @@ -139,6 +140,43 @@ public abstract class UnaryExpression extends Expression { return expressionIntermediateLayerMap.get(this); } + @Override + public IntermediateLayer constructIntermediateLayer( + long queryId, + UDTFContext udtfContext, + RawQueryInputLayer rawTimeSeriesInputLayer, + Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, + TypeProvider typeProvider, + LayerMemoryAssigner memoryAssigner) + throws QueryProcessException, IOException { + if (!expressionIntermediateLayerMap.containsKey(this)) { + float memoryBudgetInMB = memoryAssigner.assign(); + + IntermediateLayer parentLayerPointReader = + expression.constructIntermediateLayer( + queryId, + udtfContext, + rawTimeSeriesInputLayer, + expressionIntermediateLayerMap, + typeProvider, + memoryAssigner); + Transformer transformer = constructTransformer(parentLayerPointReader.constructPointReader()); + + // SingleInputColumnMultiReferenceIntermediateLayer doesn't support ConstantLayerPointReader + // yet. And since a ConstantLayerPointReader won't produce too much IO, + // SingleInputColumnSingleReferenceIntermediateLayer could be a better choice. + expressionIntermediateLayerMap.put( + this, + memoryAssigner.getReference(this) == 1 || isConstantOperand() + ? new SingleInputColumnSingleReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer) + : new SingleInputColumnMultiReferenceIntermediateLayer( + this, queryId, memoryBudgetInMB, transformer)); + } + + return expressionIntermediateLayerMap.get(this); + } + protected abstract Transformer constructTransformer(LayerPointReader pointReader); @Override diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java index 83cb5990a0..b8f697f6b7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.udf.core.executor; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.multi.FunctionExpression; import org.apache.iotdb.db.query.udf.api.UDTF; @@ -53,6 +54,32 @@ public class UDTFExecutor { configurations = new UDTFConfigurations(zoneId); } + public void beforeStart(long queryId, float collectorMemoryBudgetInMB, TypeProvider typeProvider) + throws QueryProcessException { + udtf = (UDTF) UDFRegistrationService.getInstance().reflect(expression); + + UDFParameters parameters = new UDFParameters(expression, typeProvider); + + try { + udtf.validate(new UDFParameterValidator(parameters)); + } catch (Exception e) { + onError("validate(UDFParameterValidator)", e); + } + + try { + udtf.beforeStart(parameters, configurations); + } catch (Exception e) { + onError("beforeStart(UDFParameters, UDTFConfigurations)", e); + } + configurations.check(); + + collector = + ElasticSerializableTVList.newElasticSerializableTVList( + configurations.getOutputDataType(), queryId, collectorMemoryBudgetInMB, 1); + } + + // TODO: remove it after MPP finished + @Deprecated public void beforeStart( long queryId, float collectorMemoryBudgetInMB, diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java index debc89d250..92f52963d5 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/EvaluationDAGBuilder.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.query.udf.core.layer; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.io.IOException; import java.util.HashMap; @@ -47,18 +47,20 @@ public class EvaluationDAGBuilder { // sub-expressions, but they can share the same point reader. we cache the point reader here to // make sure that only one point reader will be built for one expression. private final Map<Expression, IntermediateLayer> expressionIntermediateLayerMap; - private final Map<Expression, TSDataType> expressionDataTypeMap; + private final TypeProvider typeProvider; public EvaluationDAGBuilder( long queryId, RawQueryInputLayer inputLayer, Expression[] outputExpressions, UDTFContext udtfContext, + TypeProvider typeProvider, float memoryBudgetInMB) { this.queryId = queryId; this.inputLayer = inputLayer; this.outputExpressions = outputExpressions; this.udtfContext = udtfContext; + this.typeProvider = typeProvider; int size = inputLayer.getInputColumnCount(); outputPointReaders = new LayerPointReader[size]; @@ -66,7 +68,6 @@ public class EvaluationDAGBuilder { memoryAssigner = new LayerMemoryAssigner(memoryBudgetInMB); expressionIntermediateLayerMap = new HashMap<>(); - expressionDataTypeMap = new HashMap<>(); } public EvaluationDAGBuilder buildLayerMemoryAssigner() { @@ -87,7 +88,7 @@ public class EvaluationDAGBuilder { udtfContext, inputLayer, expressionIntermediateLayerMap, - expressionDataTypeMap, + typeProvider, memoryAssigner) .constructPointReader(); }
