This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch udf-operator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 14491af4dfe18a4118f184cbef41f198b757f103 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Apr 12 21:00:36 2022 +0800 bind expr with input column index in another way --- .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 37 ++++--------- .../apache/iotdb/db/query/dataset/UDTFDataSet.java | 3 +- .../iotdb/db/query/expression/Expression.java | 15 ++---- .../query/expression/binary/BinaryExpression.java | 14 +++-- .../db/query/expression/unary/ConstantOperand.java | 8 ++- .../query/expression/unary/FunctionExpression.java | 26 ++++++---- .../query/expression/unary/LogicNotExpression.java | 17 ++++-- .../query/expression/unary/NegationExpression.java | 11 +++- .../query/expression/unary/TimeSeriesOperand.java | 10 +++- .../db/query/udf/core/executor/UDTFContext.java | 60 ++++++++++++++++++++++ .../iotdb/db/query/udf/core/layer/DAGBuilder.java | 9 +++- .../query/udf/core/layer/RawQueryInputLayer.java | 4 ++ 12 files changed, 156 insertions(+), 58 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java index 976bfec456..e1c9dbadb7 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java @@ -25,9 +25,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; import org.apache.iotdb.db.query.expression.ResultColumn; -import org.apache.iotdb.db.query.expression.unary.FunctionExpression; -import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; -import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; @@ -43,15 +41,14 @@ import java.util.Set; public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { - protected final ZoneId zoneId; + protected final UDTFContext udtfContext; - protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>(); - protected Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>(); - protected Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); + protected final Map<Integer, Integer> datasetOutputIndexToResultColumnIndex = new HashMap<>(); + protected final Map<String, Integer> pathNameToReaderIndex = new HashMap<>(); public UDTFPlan(ZoneId zoneId) { super(); - this.zoneId = zoneId; + udtfContext = new UDTFContext(zoneId); setOperatorType(Operator.OperatorType.UDTF); } @@ -128,35 +125,23 @@ public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { @Override public void constructUdfExecutors(List<ResultColumn> resultColumns) { - for (ResultColumn resultColumn : resultColumns) { - resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId); - } + udtfContext.constructUdfExecutors(resultColumns); } @Override public void finalizeUDFExecutors(long queryId) { - try { - for (UDTFExecutor executor : expressionName2Executor.values()) { - executor.beforeDestroy(); - } - } finally { - UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId); - } + udtfContext.finalizeUDFExecutors(queryId); } public ResultColumn getResultColumnByDatasetOutputIndex(int datasetOutputIndex) { return resultColumns.get(datasetOutputIndexToResultColumnIndex.get(datasetOutputIndex)); } - public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) { - return expressionName2Executor.get(functionExpression.getExpressionString()); - } - - public int getReaderIndex(String pathName) { - return pathNameToReaderIndex.get(pathName); + public Integer getReaderIndexByExpressionName(String expressionName) { + return pathNameToReaderIndex.get(expressionName); } - public int getReaderIndexByExpressionName(String expressionName) { - return pathNameToReaderIndex.get(expressionName); + public UDTFContext getUdtfContext() { + return udtfContext; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java index 1fe27e7c14..c299a355f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/UDTFDataSet.java @@ -102,7 +102,7 @@ public abstract class UDTFDataSet extends QueryDataSet { initDataSetFields(); } - protected UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet) + public UDTFDataSet(QueryContext queryContext, UDTFPlan udtfPlan, IUDFInputDataSet dataSet) throws QueryProcessException, IOException { queryId = queryContext.getQueryId(); this.udtfPlan = udtfPlan; @@ -123,6 +123,7 @@ public abstract class UDTFDataSet extends QueryDataSet { udtfPlan, rawQueryInputLayer, UDF_TRANSFORMER_MEMORY_BUDGET_IN_MB + UDF_COLLECTOR_MEMORY_BUDGET_IN_MB) + .bindInputLayerColumnIndexWithExpression() .buildLayerMemoryAssigner() .buildResultColumnPointReaders() .setDataSetResultColumnDataTypes() diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java index 619167bf96..48f231c8fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.unary.ConstantOperand; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -49,7 +50,7 @@ public abstract class Expression { protected Boolean isConstantOperandCache = null; - protected Integer tsBlockInputColumnIndex = null; + protected Integer inputColumnIndex = null; public boolean isBuiltInAggregationFunctionExpression() { return false; @@ -63,14 +64,6 @@ public abstract class Expression { return false; } - public Integer getTsBlockInputColumnIndex() { - return tsBlockInputColumnIndex; - } - - public void setTsBlockInputColumnIndex(Integer tsBlockInputColumnIndex) { - this.tsBlockInputColumnIndex = tsBlockInputColumnIndex; - } - public abstract void concat( List<PartialPath> prefixPaths, List<Expression> resultExpressions, @@ -94,11 +87,13 @@ public abstract class Expression { public abstract void constructUdfExecutors( Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId); + public abstract void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan); + public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner); public abstract IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java index b706a52f5d..18eca2151b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -205,6 +206,13 @@ public abstract class BinaryExpression extends Expression { rightExpression.constructUdfExecutors(expressionName2Executor, zoneId); } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + leftExpression.bindInputLayerColumnIndexWithExpression(udtfPlan); + rightExpression.bindInputLayerColumnIndexWithExpression(udtfPlan); + inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { leftExpression.updateStatisticsForMemoryAssigner(memoryAssigner); @@ -215,7 +223,7 @@ public abstract class BinaryExpression extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -227,7 +235,7 @@ public abstract class BinaryExpression extends Expression { IntermediateLayer leftParentIntermediateLayer = leftExpression.constructIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, @@ -235,7 +243,7 @@ public abstract class BinaryExpression extends Expression { IntermediateLayer rightParentIntermediateLayer = rightExpression.constructIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java index 0d32cfbad5..e8e0088599 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.ConstantIntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; @@ -102,6 +103,11 @@ public class ConstantOperand extends Expression { // Do nothing } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + // Do nothing + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { // Do nothing @@ -110,7 +116,7 @@ public class ConstantOperand extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java index 564ce5bb09..3683ff50fb 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.strategy.optimizer.ConcatPathOptimizer; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -246,6 +247,14 @@ public class FunctionExpression extends Expression { expressionName2Executor.put(expressionString, new UDTFExecutor(this, zoneId)); } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + for (Expression expression : expressions) { + expression.bindInputLayerColumnIndexWithExpression(udtfPlan); + } + inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { for (Expression expression : expressions) { @@ -257,7 +266,7 @@ public class FunctionExpression extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -269,13 +278,12 @@ public class FunctionExpression extends Expression { if (isBuiltInAggregationFunctionExpression) { transformer = new TransparentTransformer( - rawTimeSeriesInputLayer.constructPointReader( - udtfPlan.getReaderIndexByExpressionName(toString()))); + rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex)); } else { IntermediateLayer udfInputIntermediateLayer = constructUdfInputIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, @@ -283,7 +291,7 @@ public class FunctionExpression extends Expression { transformer = constructUdfTransformer( queryId, - udtfPlan, + udtfContext, expressionDataTypeMap, memoryAssigner, udfInputIntermediateLayer); @@ -303,7 +311,7 @@ public class FunctionExpression extends Expression { private IntermediateLayer constructUdfInputIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -314,7 +322,7 @@ public class FunctionExpression extends Expression { intermediateLayers.add( expression.constructIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, @@ -333,12 +341,12 @@ public class FunctionExpression extends Expression { private UDFQueryTransformer constructUdfTransformer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, Map<Expression, TSDataType> expressionDataTypeMap, LayerMemoryAssigner memoryAssigner, IntermediateLayer udfInputIntermediateLayer) throws QueryProcessException, IOException { - UDTFExecutor executor = udtfPlan.getExecutorByFunctionExpression(this); + UDTFExecutor executor = udtfContext.getExecutorByFunctionExpression(this); executor.beforeStart(queryId, memoryAssigner.assign(), expressionDataTypeMap); diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java index 66893a0172..33fb9cb5a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.utils.WildcardsRemover; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -39,7 +40,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import java.io.IOException; import java.time.ZoneId; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; public class LogicNotExpression extends Expression { protected Expression expression; @@ -127,6 +132,12 @@ public class LogicNotExpression extends Expression { expression.constructUdfExecutors(expressionName2Executor, zoneId); } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + expression.bindInputLayerColumnIndexWithExpression(udtfPlan); + inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { expression.updateStatisticsForMemoryAssigner(memoryAssigner); @@ -136,7 +147,7 @@ public class LogicNotExpression extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -148,7 +159,7 @@ public class LogicNotExpression extends Expression { IntermediateLayer parentLayerPointReader = expression.constructIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java index 04065daa55..5b8dccf1ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -132,6 +133,12 @@ public class NegationExpression extends Expression { expression.constructUdfExecutors(expressionName2Executor, zoneId); } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + expression.bindInputLayerColumnIndexWithExpression(udtfPlan); + inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { expression.updateStatisticsForMemoryAssigner(memoryAssigner); @@ -141,7 +148,7 @@ public class NegationExpression extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -153,7 +160,7 @@ public class NegationExpression extends Expression { IntermediateLayer parentLayerPointReader = expression.constructIntermediateLayer( queryId, - udtfPlan, + udtfContext, rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java index a9c1052c01..adcb7dff6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java +++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree; import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover; import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.executor.UDTFExecutor; import org.apache.iotdb.db.query.udf.core.layer.IntermediateLayer; import org.apache.iotdb.db.query.udf.core.layer.LayerMemoryAssigner; @@ -116,6 +117,11 @@ public class TimeSeriesOperand extends Expression { // nothing to do } + @Override + public void bindInputLayerColumnIndexWithExpression(UDTFPlan udtfPlan) { + inputColumnIndex = udtfPlan.getReaderIndexByExpressionName(toString()); + } + @Override public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner memoryAssigner) { memoryAssigner.increaseExpressionReference(this); @@ -124,7 +130,7 @@ public class TimeSeriesOperand extends Expression { @Override public IntermediateLayer constructIntermediateLayer( long queryId, - UDTFPlan udtfPlan, + UDTFContext udtfContext, RawQueryInputLayer rawTimeSeriesInputLayer, Map<Expression, IntermediateLayer> expressionIntermediateLayerMap, Map<Expression, TSDataType> expressionDataTypeMap, @@ -134,7 +140,7 @@ public class TimeSeriesOperand extends Expression { float memoryBudgetInMB = memoryAssigner.assign(); LayerPointReader parentLayerPointReader = - rawTimeSeriesInputLayer.constructPointReader(udtfPlan.getReaderIndex(path.getFullPath())); + rawTimeSeriesInputLayer.constructPointReader(inputColumnIndex); expressionDataTypeMap.put(this, parentLayerPointReader.getDataType()); expressionIntermediateLayerMap.put( diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java new file mode 100644 index 0000000000..cb7d467403 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.query.udf.core.executor; + +import org.apache.iotdb.db.query.expression.ResultColumn; +import org.apache.iotdb.db.query.expression.unary.FunctionExpression; +import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class UDTFContext { + + protected final ZoneId zoneId; + + protected Map<String, UDTFExecutor> expressionName2Executor = new HashMap<>(); + + public UDTFContext(ZoneId zoneId) { + this.zoneId = zoneId; + } + + public void constructUdfExecutors(List<ResultColumn> resultColumns) { + for (ResultColumn resultColumn : resultColumns) { + resultColumn.getExpression().constructUdfExecutors(expressionName2Executor, zoneId); + } + } + + public void finalizeUDFExecutors(long queryId) { + try { + for (UDTFExecutor executor : expressionName2Executor.values()) { + executor.beforeDestroy(); + } + } finally { + UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId); + } + } + + public UDTFExecutor getExecutorByFunctionExpression(FunctionExpression functionExpression) { + return expressionName2Executor.get(functionExpression.getExpressionString()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java index 184753489d..270482b1e6 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/DAGBuilder.java @@ -69,6 +69,13 @@ public class DAGBuilder { expressionDataTypeMap = new HashMap<>(); } + public DAGBuilder bindInputLayerColumnIndexWithExpression() { + for (Expression expression : resultColumnExpressions) { + expression.bindInputLayerColumnIndexWithExpression(udtfPlan); + } + return this; + } + public DAGBuilder buildLayerMemoryAssigner() { for (Expression expression : resultColumnExpressions) { expression.updateStatisticsForMemoryAssigner(memoryAssigner); @@ -83,7 +90,7 @@ public class DAGBuilder { resultColumnExpressions[i] .constructIntermediateLayer( queryId, - udtfPlan, + udtfPlan.getUdtfContext(), rawTimeSeriesInputLayer, expressionIntermediateLayerMap, expressionDataTypeMap, diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java index d462da438c..fec8fd71b3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java @@ -94,6 +94,10 @@ public class RawQueryInputLayer { rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine()); } + public int getInputColumnCount() { + return dataTypes.length; + } + public LayerPointReader constructPointReader(int columnIndex) { return new InputLayerPointReader(columnIndex); }
