This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch tvf_handler in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e6b9d075d1fc32542e8e49550d7d7adfd695431 Author: Chen YZ <[email protected]> AuthorDate: Wed Apr 23 00:53:23 2025 +0900 save --- .../iotdb/udf/table/ExcludeColumnExample.java | 11 +- .../org/apache/iotdb/udf/table/RepeatExample.java | 17 ++- .../org/apache/iotdb/udf/table/SplitExample.java | 23 +++- .../example/relational/MyErrorTableFunction.java | 11 +- .../udf/example/relational/MyExcludeColumn.java | 11 +- .../udf/example/relational/MyRepeatWithIndex.java | 17 ++- .../example/relational/MyRepeatWithoutIndex.java | 17 ++- .../db/query/udf/example/relational/MySplit.java | 23 +++- .../api/relational/EmptyTableFunctionHandle.java | 32 +++++ .../iotdb/udf/api/relational/TableFunction.java | 10 +- .../relational/table/MapTableFunctionHandle.java | 132 +++++++++++++++++++++ .../relational/table/TableFunctionAnalysis.java | 21 +++- .../api/relational/table/TableFunctionHandle.java | 19 +++ .../plan/planner/TableOperatorGenerator.java | 2 +- .../relational/analyzer/StatementAnalyzer.java | 1 + .../TableFunctionInvocationAnalysis.java | 8 ++ .../plan/relational/planner/RelationPlanner.java | 1 + .../rule/ImplementTableFunctionSource.java | 2 + .../rule/PruneTableFunctionProcessorColumns.java | 1 + .../PruneTableFunctionProcessorSourceColumns.java | 1 + .../relational/planner/node/TableFunctionNode.java | 35 +++++- .../planner/node/TableFunctionProcessorNode.java | 26 ++++ .../optimizations/UnaliasSymbolReferences.java | 3 + .../db/queryengine/plan/function/Exclude.java | 11 +- .../iotdb/db/queryengine/plan/function/Repeat.java | 18 ++- .../iotdb/db/queryengine/plan/function/Split.java | 23 +++- .../relational/tvf/CapacityTableFunction.java | 22 +++- .../relational/tvf/CumulateTableFunction.java | 24 +++- .../builtin/relational/tvf/HOPTableFunction.java | 26 +++- .../relational/tvf/SessionTableFunction.java | 17 ++- .../relational/tvf/TumbleTableFunction.java | 21 +++- .../relational/tvf/VariationTableFunction.java | 17 ++- 32 files changed, 546 insertions(+), 57 deletions(-) diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java index 99f9023299f..b1cdf810e41 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java @@ -20,8 +20,10 @@ package org.apache.iotdb.udf.table; import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -85,11 +87,18 @@ public class ExcludeColumnExample implements TableFunction { return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new EmptyTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new EmptyTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java index bfae7bcd383..3837f80e6e4 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -81,22 +83,31 @@ public class RepeatExample implements TableFunction { throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java index 976ed970b2f..47f1faec68c 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java @@ -21,7 +21,9 @@ package org.apache.iotdb.udf.table; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -73,17 +75,30 @@ public class SplitExample implements TableFunction { @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java index 177892e4e03..17d94f54a13 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.db.query.udf.example.relational; import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -95,13 +97,20 @@ public class MyErrorTableFunction implements TableFunction { .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) .requiredColumns("TIMECHO", Collections.singletonList(1)) + .handle(new MapTableFunctionHandle()) .build(); } throw new UDFArgumentNotValidException("unexpected argument value"); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java index 35e35e085c8..49cc5a490d8 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.udf.example.relational; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,11 +68,18 @@ public class MyExcludeColumn implements TableFunction { return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new MapTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java index 31cc06b44d0..62df7a7ea0d 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,22 +68,31 @@ public class MyRepeatWithIndex implements TableFunction { throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java index 3b49c267c5c..b60dda00509 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; @@ -65,21 +67,30 @@ public class MyRepeatWithoutIndex implements TableFunction { throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java index d9a395eaa25..f243c259d92 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.udf.example.relational; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -57,17 +59,30 @@ public class MySplit implements TableFunction { @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java new file mode 100644 index 00000000000..8fed0b6acd1 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; + +public class EmptyTableFunctionHandle implements TableFunctionHandle { + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public void deserialize(byte[] bytes) {} +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java index 8c81840a4f8..a79c4336708 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java @@ -21,6 +21,7 @@ package org.apache.iotdb.udf.api.relational; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; @@ -81,6 +82,8 @@ public interface TableFunction extends SQLFunction { * <li>A description of proper columns. * <li>A map indicating which columns from the input table arguments are required for the * function to execute. + * <li>A TableFunctionExecutionInfo which stores all information necessary to execute the + * table function. * </ul> * </ul> * @@ -91,13 +94,16 @@ public interface TableFunction extends SQLFunction { */ TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException; + TableFunctionHandle createTableFunctionHandle(); + /** * This method is used to obtain a {@link TableFunctionProcessorProvider} that will be responsible * for creating processors to handle the transformation of input data into output table. The * provider is initialized with the validated arguments. * - * @param arguments a map of argument names to their corresponding {@link Argument} values + * @param tableFunctionHandle the object containing the execution information, which is generated + * in the {@link TableFunction#analyze} process. * @return a {@link TableFunctionProcessorProvider} for creating processors */ - TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments); + TableFunctionProcessorProvider getProcessorProvider(TableFunctionHandle tableFunctionHandle); } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java new file mode 100644 index 00000000000..d4946e5e2c8 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational.table; + +import org.apache.iotdb.udf.api.type.Type; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class MapTableFunctionHandle implements TableFunctionHandle { + private static final Set<Class<?>> SUPPORT_VALUE_TYPE = + new HashSet<>( + Arrays.asList( + Integer.class, Long.class, Double.class, Float.class, String.class, Double.class)); + private final Map<String, Object> map = new HashMap<>(); + + public void addProperty(String key, Object value) { + if (!SUPPORT_VALUE_TYPE.contains(value.getClass())) { + throw new IllegalArgumentException("Unsupported value type."); + } + map.put(key, value); + } + + public Object getProperty(String key) { + return map.get(key); + } + + @Override + public byte[] serialize() { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putInt(map.size()); + for (Map.Entry<String, Object> entry : map.entrySet()) { + byte[] bytes = entry.getKey().getBytes(StandardCharsets.UTF_8); + buffer.putInt(bytes.length); + buffer.put(bytes); + if (entry.getValue() instanceof Long) { + buffer.put(Type.INT64.getType()); + buffer.putLong((Long) entry.getValue()); + } else if (entry.getValue() instanceof Integer) { + buffer.put(Type.INT32.getType()); + buffer.putInt((Integer) entry.getValue()); + } else if (entry.getValue() instanceof Double) { + buffer.put(Type.DOUBLE.getType()); + buffer.putDouble((Double) entry.getValue()); + } else if (entry.getValue() instanceof Float) { + buffer.put(Type.FLOAT.getType()); + buffer.putFloat((Float) entry.getValue()); + } else if (entry.getValue() instanceof Boolean) { + buffer.put(Type.BOOLEAN.getType()); + buffer.put(Boolean.TRUE.equals(entry.getValue()) ? (byte) 1 : (byte) 0); + } else if (entry.getValue() instanceof String) { + buffer.put(Type.STRING.getType()); + bytes = ((String) entry.getValue()).getBytes(StandardCharsets.UTF_8); + buffer.putInt(bytes.length); + buffer.put(bytes); + } + } + return buffer.array(); + } + + @Override + public void deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + int size = buffer.getInt(); + for (int i = 0; i < size; i++) { + byte[] b = new byte[buffer.getInt()]; + buffer.get(b); + String key = new String(b, StandardCharsets.UTF_8); + Type type = Type.valueOf(buffer.get()); + switch (type) { + case BOOLEAN: + map.put(key, buffer.get() != 0); + break; + case INT32: + map.put(key, buffer.getInt()); + break; + case INT64: + map.put(key, buffer.getLong()); + break; + case FLOAT: + map.put(key, buffer.getFloat()); + break; + case DOUBLE: + map.put(key, buffer.getDouble()); + break; + case STRING: + b = new byte[buffer.getInt()]; + buffer.get(b); + map.put(key, new String(b, StandardCharsets.UTF_8)); + break; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("MapTableFunctionHandle{"); + for (Map.Entry<String, Object> entry : map.entrySet()) { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(", "); + } + if (sb.length() > 2) { + sb.setLength(sb.length() - 2); // remove last comma and space + } + sb.append('}'); + return sb.toString(); + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java index bff404c1623..630fa814fca 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java @@ -58,13 +58,17 @@ public class TableFunctionAnalysis { */ private final boolean requireRecordSnapshot; + private final TableFunctionHandle handle; + private TableFunctionAnalysis( Optional<DescribedSchema> properColumnSchema, Map<String, List<Integer>> requiredColumns, - boolean requiredRecordSnapshot) { + boolean requiredRecordSnapshot, + TableFunctionHandle handle) { this.properColumnSchema = requireNonNull(properColumnSchema, "returnedType is null"); this.requiredColumns = requiredColumns; this.requireRecordSnapshot = requiredRecordSnapshot; + this.handle = requireNonNull(handle, "TableFunctionHandle is null"); } public Optional<DescribedSchema> getProperColumnSchema() { @@ -79,6 +83,10 @@ public class TableFunctionAnalysis { return requireRecordSnapshot; } + public TableFunctionHandle getTableFunctionHandle() { + return handle; + } + public static Builder builder() { return new Builder(); } @@ -87,6 +95,7 @@ public class TableFunctionAnalysis { private DescribedSchema properColumnSchema; private final Map<String, List<Integer>> requiredColumns = new HashMap<>(); private boolean requireRecordSnapshot = true; + private TableFunctionHandle executionInfo; private Builder() {} @@ -105,9 +114,17 @@ public class TableFunctionAnalysis { return this; } + public Builder handle(TableFunctionHandle executionInfo) { + this.executionInfo = executionInfo; + return this; + } + public TableFunctionAnalysis build() { return new TableFunctionAnalysis( - Optional.ofNullable(properColumnSchema), requiredColumns, requireRecordSnapshot); + Optional.ofNullable(properColumnSchema), + requiredColumns, + requireRecordSnapshot, + executionInfo); } } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java new file mode 100644 index 00000000000..61b4f1fbf1d --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.udf.api.relational.table; + +/** + * An area to store all information necessary to execute the table function, gathered at analysis + * time + */ +public interface TableFunctionHandle { + /** + * Serialize your state into byte array. The order of serialization must be consistent with + * deserialization. + */ + byte[] serialize(); + + /** + * Deserialize byte array into your state. The order of deserialization must be consistent with + * serialization. + */ + void deserialize(byte[] bytes); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index ee6a13a006c..3bc8f5a7e88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -2862,7 +2862,7 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution TableFunctionProcessorNode node, LocalExecutionPlanContext context) { TableFunction tableFunction = metadata.getTableFunction(node.getName()); TableFunctionProcessorProvider processorProvider = - tableFunction.getProcessorProvider(node.getArguments()); + tableFunction.getProcessorProvider(node.getTableFunctionHandle()); if (node.getChildren().isEmpty()) { List<TSDataType> outputDataTypes = node.getOutputSymbols().stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index ec8279853bd..d248448d018 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -4140,6 +4140,7 @@ public class StatementAnalyzer { new TableFunctionInvocationAnalysis( node.getName().toString(), argumentsAnalysis.getPassedArguments(), + functionAnalysis.getTableFunctionHandle(), orderedTableArguments.build(), requiredColumns, properSchema.map(describedSchema -> describedSchema.getFields().size()).orElse(0), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java index 73900573fa9..1740ac2120a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -29,6 +30,7 @@ import java.util.Map; public class TableFunctionInvocationAnalysis { private final String functionName; private final Map<String, Argument> passedArguments; + private final TableFunctionHandle tableFunctionHandle; private final List<TableArgumentAnalysis> tableArgumentAnalyses; private final Map<String, List<Integer>> requiredColumns; private final int properColumnsCount; @@ -37,12 +39,14 @@ public class TableFunctionInvocationAnalysis { public TableFunctionInvocationAnalysis( String name, Map<String, Argument> passedArguments, + TableFunctionHandle tableFunctionHandle, ImmutableList<TableArgumentAnalysis> tableArgumentAnalyses, Map<String, List<Integer>> requiredColumns, int properColumnsCount, boolean requiredRecordSnapshot) { this.functionName = name; this.passedArguments = passedArguments; + this.tableFunctionHandle = tableFunctionHandle; this.tableArgumentAnalyses = tableArgumentAnalyses; this.requiredColumns = requiredColumns; this.properColumnsCount = properColumnsCount; @@ -65,6 +69,10 @@ public class TableFunctionInvocationAnalysis { return passedArguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public int getProperColumnsCount() { return properColumnsCount; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 4d14ebaf06b..a5f2a79a3fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -919,6 +919,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { idAllocator.genPlanNodeId(), functionAnalysis.getFunctionName(), functionAnalysis.getPassedArguments(), + functionAnalysis.getTableFunctionHandle(), properOutputs, sources.build(), sourceProperties.build()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index e1079d9d5b3..f40e7c80e7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@ -74,6 +74,7 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { Optional.empty(), false, node.getArguments(), + node.getTableFunctionHandle(), false)); } else if (node.getChildren().size() == 1) { // Single source does not require pre-processing. @@ -126,6 +127,7 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { sourceProperties.getDataOrganizationSpecification(), sourceProperties.isRowSemantics(), node.getArguments(), + node.getTableFunctionHandle(), sourceProperties.isRequireRecordSnapshot())); } else { // we don't support multiple source now. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java index 60792dad57d..3d65469f02c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java @@ -83,6 +83,7 @@ public class PruneTableFunctionProcessorColumns node.getDataOrganizationSpecification(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java index 6de6efb55fb..a1d55cf492c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java @@ -92,6 +92,7 @@ public class PruneTableFunctionProcessorSourceColumns implements Rule<TableFunct node.getDataOrganizationSpecification(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()))) .orElse(Result.empty()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java index e1326d97a9c..65034f01280 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java @@ -24,8 +24,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -48,6 +50,7 @@ public class TableFunctionNode extends MultiChildProcessNode { private final String name; private final Map<String, Argument> arguments; + private final TableFunctionHandle tableFunctionHandle; private final List<Symbol> properOutputs; private final List<TableArgumentProperties> tableArgumentProperties; @@ -55,12 +58,14 @@ public class TableFunctionNode extends MultiChildProcessNode { PlanNodeId id, String name, Map<String, Argument> arguments, + TableFunctionHandle tableFunctionHandle, List<Symbol> properOutputs, List<PlanNode> children, List<TableArgumentProperties> tableArgumentProperties) { super(id, children); this.name = requireNonNull(name, "name is null"); this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); } @@ -69,11 +74,13 @@ public class TableFunctionNode extends MultiChildProcessNode { PlanNodeId id, String name, Map<String, Argument> arguments, + TableFunctionHandle tableFunctionHandle, List<Symbol> properOutputs, List<TableArgumentProperties> tableArgumentProperties) { super(id); this.name = requireNonNull(name, "name is null"); this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); } @@ -86,6 +93,10 @@ public class TableFunctionNode extends MultiChildProcessNode { return arguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public List<Symbol> getProperOutputs() { return properOutputs; } @@ -96,7 +107,8 @@ public class TableFunctionNode extends MultiChildProcessNode { @Override public PlanNode clone() { - return new TableFunctionNode(id, name, arguments, properOutputs, tableArgumentProperties); + return new TableFunctionNode( + id, name, arguments, tableFunctionHandle, properOutputs, tableArgumentProperties); } @Override @@ -130,7 +142,13 @@ public class TableFunctionNode extends MultiChildProcessNode { public PlanNode replaceChildren(List<PlanNode> newSources) { checkArgument(children.size() == newSources.size(), "wrong number of new children"); return new TableFunctionNode( - getPlanNodeId(), name, arguments, properOutputs, newSources, tableArgumentProperties); + getPlanNodeId(), + name, + arguments, + tableFunctionHandle, + properOutputs, + newSources, + tableArgumentProperties); } @Override @@ -142,6 +160,9 @@ public class TableFunctionNode extends MultiChildProcessNode { ReadWriteIOUtils.write(entry.getKey(), byteBuffer); entry.getValue().serialize(byteBuffer); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, byteBuffer); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); ReadWriteIOUtils.write(properOutputs.size(), byteBuffer); properOutputs.forEach(symbol -> Symbol.serialize(symbol, byteBuffer)); ReadWriteIOUtils.write(tableArgumentProperties.size(), byteBuffer); @@ -170,6 +191,9 @@ public class TableFunctionNode extends MultiChildProcessNode { ReadWriteIOUtils.write(entry.getKey(), stream); entry.getValue().serialize(stream); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, stream); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); ReadWriteIOUtils.write(properOutputs.size(), stream); for (Symbol symbol : properOutputs) { Symbol.serialize(symbol, stream); @@ -201,6 +225,12 @@ public class TableFunctionNode extends MultiChildProcessNode { arguments.put(key, value); } size = ReadWriteIOUtils.readInt(byteBuffer); + byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); + TableFunctionHandle tableFunctionHandle = + new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); + tableFunctionHandle.deserialize(bytes); + // TODO: how to get table function handle instance more elegantly? + size = ReadWriteIOUtils.readInt(byteBuffer); ImmutableList.Builder<Symbol> properOutputs = ImmutableList.builder(); for (int i = 0; i < size; i++) { properOutputs.add(Symbol.deserialize(byteBuffer)); @@ -238,6 +268,7 @@ public class TableFunctionNode extends MultiChildProcessNode { planNodeId, name, arguments.build(), + tableFunctionHandle, properOutputs.build(), tableArgumentProperties.build()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index 57184818de7..260578f5f70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@ -24,8 +24,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -65,6 +67,8 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { private final Map<String, Argument> arguments; + private final TableFunctionHandle tableFunctionHandle; + private final boolean requireRecordSnapshot; public TableFunctionProcessorNode( @@ -77,6 +81,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { Optional<DataOrganizationSpecification> dataOrganizationSpecification, boolean rowSemantic, Map<String, Argument> arguments, + TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id, source.orElse(null)); this.name = requireNonNull(name, "name is null"); @@ -87,6 +92,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -99,6 +105,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { Optional<DataOrganizationSpecification> dataOrganizationSpecification, boolean rowSemantic, Map<String, Argument> arguments, + TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id); this.name = requireNonNull(name, "name is null"); @@ -109,6 +116,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -140,6 +148,10 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { return arguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public boolean isRequireRecordSnapshot() { return requireRecordSnapshot; } @@ -155,6 +167,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } @@ -210,6 +223,9 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { ReadWriteIOUtils.write(key, byteBuffer); value.serialize(byteBuffer); }); + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, byteBuffer); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); ReadWriteIOUtils.write(requireRecordSnapshot, byteBuffer); } @@ -239,6 +255,9 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { ReadWriteIOUtils.write(entry.getKey(), stream); entry.getValue().serialize(stream); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, stream); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); ReadWriteIOUtils.write(requireRecordSnapshot, stream); } @@ -272,6 +291,11 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { Argument value = Argument.deserialize(byteBuffer); arguments.put(key, value); } + byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); + TableFunctionHandle tableFunctionHandle = + new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); + tableFunctionHandle.deserialize(bytes); + // TODO: how to get table function handle instance more elegantly? boolean requireRecordSnapshot = ReadWriteIOUtils.readBoolean(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); @@ -284,6 +308,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } @@ -301,6 +326,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index d12db4d3034..1f13f5b893a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -871,6 +871,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { node.getPlanNodeId(), node.getName(), node.getArguments(), + node.getTableFunctionHandle(), newProperOutputs, newSources.build(), newTableArgumentProperties.build()), @@ -894,6 +895,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { Optional.empty(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()), mapping); } @@ -931,6 +933,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { newSpecification, node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()); return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java index 8fd86f684bf..bc5289da14a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.queryengine.plan.function; import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,11 +68,18 @@ public class Exclude implements TableFunction { return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new EmptyTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new EmptyTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java index 0682fd83d53..766fdffcd3d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -62,22 +64,32 @@ public class Repeat implements TableFunction { throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java index fede9cae8a7..ef86ccb3028 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java @@ -21,7 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.function; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -57,17 +59,30 @@ public class Split implements TableFunction { @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java index f5d84c37ada..630273efbb0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -60,33 +62,43 @@ public class CapacityTableFunction implements TableFunction { if (size <= 0) { throw new UDFException("Size must be greater than 0"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(SIZE_PARAMETER_NAME, size); return TableFunctionAnalysis.builder() .properColumnSchema( new DescribedSchema.Builder().addField("window_index", Type.INT64).build()) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - long sz = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + long sz = + (long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new CountDataProcessor(sz); + return new CapacityDataProcessor(sz); } }; } - private static class CountDataProcessor implements TableFunctionDataProcessor { + private static class CapacityDataProcessor implements TableFunctionDataProcessor { private final long size; private long currentStartIndex = 0; private long curIndex = 0; private long windowIndex = 0; - public CountDataProcessor(long size) { + public CapacityDataProcessor(long size) { this.size = size; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java index acb3e588f05..b37ea8450d5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -94,24 +96,36 @@ public class CumulateTableFunction implements TableFunction { .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(STEP_PARAMETER_NAME, step); + handle.addProperty(SIZE_PARAMETER_NAME, size); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new CumulateDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(STEP_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(STEP_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java index b2178c5805a..382fd349dba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java @@ -21,7 +21,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -84,24 +86,38 @@ public class HOPTableFunction implements TableFunction { .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); + handle.addProperty( + SLIDE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue()); + handle.addProperty( + SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + MapTableFunctionHandle mapTableFunctionHandle = (MapTableFunctionHandle) tableFunctionHandle; return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new HOPDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) mapTableFunctionHandle.getProperty(ORIGIN_PARAMETER_NAME), + (Long) mapTableFunctionHandle.getProperty(SLIDE_PARAMETER_NAME), + (Long) mapTableFunctionHandle.getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java index 9a7ec4577ab..4133d36c8da 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -75,17 +77,28 @@ public class SessionTableFunction implements TableFunction { .addField("window_end", Type.TIMESTAMP) .build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + GAP_PARAMETER_NAME, ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - long gap = (long) ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + long gap = + (long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(GAP_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java index a239c694129..161e2b040c2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -83,22 +85,35 @@ public class TumbleTableFunction implements TableFunction { .addField("window_end", Type.TIMESTAMP) .build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); + handle.addProperty( + SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TumbleDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 189ef0bc469..6faf47289b8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -76,16 +78,27 @@ public class VariationTableFunction implements TableFunction { DescribedSchema properColumnSchema = new DescribedSchema.Builder().addField("window_index", Type.INT64).build(); // outputColumnSchema + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + DELTA_PARAMETER_NAME, ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + double delta = + (double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() {
