This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new db3a3845475 Add TableFunctionHandle for TVF
db3a3845475 is described below
commit db3a38454750a316d8a3be9479128a5b8a6a8236
Author: Chen YZ <[email protected]>
AuthorDate: Sun Apr 27 08:43:23 2025 +0900
Add TableFunctionHandle for TVF
---
.../iotdb/udf/table/ExcludeColumnExample.java | 11 +-
.../org/apache/iotdb/udf/table/RepeatExample.java | 17 +-
.../org/apache/iotdb/udf/table/SplitExample.java | 27 ++-
.../example/relational/MyErrorTableFunction.java | 15 +-
.../udf/example/relational/MyExcludeColumn.java | 11 +-
.../udf/example/relational/MyRepeatWithIndex.java | 17 +-
.../example/relational/MyRepeatWithoutIndex.java | 17 +-
.../udf/example/relational/MySelectColumn.java | 11 +-
.../db/query/udf/example/relational/MySplit.java | 27 ++-
.../api/relational/EmptyTableFunctionHandle.java | 37 ++++
.../iotdb/udf/api/relational/TableFunction.java | 10 +-
.../relational/table/MapTableFunctionHandle.java | 186 +++++++++++++++++++++
.../relational/table/TableFunctionAnalysis.java | 21 ++-
.../api/relational/table/TableFunctionHandle.java | 38 +++++
.../plan/planner/TableOperatorGenerator.java | 2 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 30 +---
.../relational/analyzer/StatementAnalyzer.java | 1 +
.../TableFunctionInvocationAnalysis.java | 8 +
.../plan/relational/planner/RelationPlanner.java | 2 +-
.../rule/ImplementTableFunctionSource.java | 4 +-
.../rule/PruneTableFunctionProcessorColumns.java | 2 +-
.../PruneTableFunctionProcessorSourceColumns.java | 2 +-
.../relational/planner/node/TableFunctionNode.java | 57 ++++---
.../planner/node/TableFunctionProcessorNode.java | 51 +++---
.../optimizations/UnaliasSymbolReferences.java | 6 +-
.../db/queryengine/plan/function/Exclude.java | 11 +-
.../iotdb/db/queryengine/plan/function/Repeat.java | 18 +-
.../iotdb/db/queryengine/plan/function/Split.java | 27 ++-
.../relational/analyzer/TableFunctionTest.java | 92 +++++-----
.../assertions/TableFunctionProcessorMatcher.java | 178 ++------------------
.../relational/tvf/CapacityTableFunction.java | 22 ++-
.../relational/tvf/CumulateTableFunction.java | 27 ++-
.../builtin/relational/tvf/HOPTableFunction.java | 31 +++-
.../relational/tvf/SessionTableFunction.java | 19 ++-
.../relational/tvf/TumbleTableFunction.java | 25 ++-
.../relational/tvf/VariationTableFunction.java | 20 ++-
36 files changed, 717 insertions(+), 363 deletions(-)
diff --git
a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java
b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java
index 99f9023299f..b1cdf810e41 100644
---
a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java
+++
b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.udf.table;
import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -85,11 +87,18 @@ public class ExcludeColumnExample implements TableFunction {
return TableFunctionAnalysis.builder()
.properColumnSchema(schemaBuilder.build())
.requiredColumns(TBL_PARAM, requiredColumns)
+ .handle(new EmptyTableFunctionHandle())
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new EmptyTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
index 51918dd12b7..7360900ba80 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java
@@ -23,7 +23,9 @@ import
org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -81,22 +83,31 @@ public class RepeatExample implements TableFunction {
throw new UDFArgumentNotValidException(
"count argument for function repeat() must be positive");
}
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder().addProperty(N_PARAM,
count.getValue()).build();
return TableFunctionAnalysis.builder()
.properColumnSchema(DescribedSchema.builder().addField("repeat_index",
Type.INT32).build())
.requiredColumns(
TBL_PARAM,
Collections.singletonList(0)) // per spec, function must require
at least one column
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- ScalarArgument count = (ScalarArgument) arguments.get("N");
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TableFunctionDataProcessor() {
- private final int n = (int) count.getValue();
+ private final int n =
+ (int) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(N_PARAM);
private long recordIndex = 0;
@Override
diff --git
a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java
b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java
index 976ed970b2f..03c46ad4a08 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java
+++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.udf.table;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -73,17 +75,34 @@ public class SplitExample implements TableFunction {
@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
DescribedSchema schema = DescribedSchema.builder().addField("output",
Type.STRING).build();
- return TableFunctionAnalysis.builder().properColumnSchema(schema).build();
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ INPUT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SPLIT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue())
+ .build();
+ return
TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionLeafProcessor getSplitProcessor() {
return new SplitProcessor(
- (String) ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue(),
- (String) ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue());
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME),
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME));
}
};
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java
index 177892e4e03..1dd44653ecb 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.query.udf.example.relational;
import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -67,6 +69,7 @@ public class MyErrorTableFunction implements TableFunction {
return TableFunctionAnalysis.builder()
.properColumnSchema(
DescribedSchema.builder().addField("proper_column",
Type.INT32).build())
+ .handle(new MapTableFunctionHandle())
.build();
} else if (nValue == 1) {
// set empty required columns
@@ -74,6 +77,7 @@ public class MyErrorTableFunction implements TableFunction {
.properColumnSchema(
DescribedSchema.builder().addField("proper_column",
Type.INT32).build())
.requiredColumns(TBL_PARAM, Collections.emptyList())
+ .handle(new MapTableFunctionHandle())
.build();
} else if (nValue == 2) {
// set negative required columns
@@ -81,6 +85,7 @@ public class MyErrorTableFunction implements TableFunction {
.properColumnSchema(
DescribedSchema.builder().addField("proper_column",
Type.INT32).build())
.requiredColumns(TBL_PARAM, Collections.singletonList(-1))
+ .handle(new MapTableFunctionHandle())
.build();
} else if (nValue == 3) {
// set required columns out of bound (0~10)
@@ -88,6 +93,7 @@ public class MyErrorTableFunction implements TableFunction {
.properColumnSchema(
DescribedSchema.builder().addField("proper_column",
Type.INT32).build())
.requiredColumns(TBL_PARAM, IntStream.range(0,
11).boxed().collect(Collectors.toList()))
+ .handle(new MapTableFunctionHandle())
.build();
} else if (nValue == 4) {
// specify required columns to unknown table
@@ -95,13 +101,20 @@ public class MyErrorTableFunction implements TableFunction
{
.properColumnSchema(
DescribedSchema.builder().addField("proper_column",
Type.INT32).build())
.requiredColumns("TIMECHO", Collections.singletonList(1))
+ .handle(new MapTableFunctionHandle())
.build();
}
throw new UDFArgumentNotValidException("unexpected argument value");
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java
index 35e35e085c8..49cc5a490d8 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.udf.example.relational;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -66,11 +68,18 @@ public class MyExcludeColumn implements TableFunction {
return TableFunctionAnalysis.builder()
.properColumnSchema(schemaBuilder.build())
.requiredColumns(TBL_PARAM, requiredColumns)
+ .handle(new MapTableFunctionHandle())
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java
index 31cc06b44d0..8fe032bfb5e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java
@@ -23,7 +23,9 @@ import
org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -66,22 +68,31 @@ public class MyRepeatWithIndex implements TableFunction {
throw new UDFArgumentNotValidException(
"count argument for function repeat() must be positive");
}
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder().addProperty(N_PARAM,
count.getValue()).build();
return TableFunctionAnalysis.builder()
.properColumnSchema(DescribedSchema.builder().addField("repeat_index",
Type.INT32).build())
.requiredColumns(
TBL_PARAM,
Collections.singletonList(0)) // per spec, function must require
at least one column
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- ScalarArgument count = (ScalarArgument) arguments.get("N");
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TableFunctionDataProcessor() {
- private final int n = (int) count.getValue();
+ private final int n =
+ (int) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(N_PARAM);
private long recordIndex = 0;
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java
index 3b49c267c5c..42b2e331dd3 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java
@@ -23,7 +23,9 @@ import
org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
@@ -65,21 +67,30 @@ public class MyRepeatWithoutIndex implements TableFunction {
throw new UDFArgumentNotValidException(
"count argument for function repeat() must be positive");
}
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder().addProperty(N_PARAM,
count.getValue()).build();
return TableFunctionAnalysis.builder()
.requiredColumns(
TBL_PARAM,
Collections.singletonList(0)) // per spec, function must require
at least one column
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- ScalarArgument count = (ScalarArgument) arguments.get("N");
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TableFunctionDataProcessor() {
- private final int n = (int) count.getValue();
+ private final int n =
+ (int) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(N_PARAM);
private long recordIndex = 0;
@Override
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java
index 27409032702..6cc0d683e10 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.db.query.udf.example.relational;
import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -66,11 +68,18 @@ public class MySelectColumn implements TableFunction {
return TableFunctionAnalysis.builder()
.properColumnSchema(schemaBuilder.build())
.requiredColumns(TBL_PARAM, requiredColumns)
+ .handle(new EmptyTableFunctionHandle())
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new EmptyTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java
index d9a395eaa25..6039b8df75e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java
+++
b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.query.udf.example.relational;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -57,17 +59,34 @@ public class MySplit implements TableFunction {
@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
DescribedSchema schema = DescribedSchema.builder().addField("output",
Type.STRING).build();
- return TableFunctionAnalysis.builder().properColumnSchema(schema).build();
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ INPUT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SPLIT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue())
+ .build();
+ return
TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionLeafProcessor getSplitProcessor() {
return new SplitProcessor(
- (String) ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue(),
- (String) ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue());
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME),
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME));
}
};
}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java
new file mode 100644
index 00000000000..8cba5ba3857
--- /dev/null
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.relational;
+
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+
+public class EmptyTableFunctionHandle implements TableFunctionHandle {
+ @Override
+ public byte[] serialize() {
+ return new byte[0];
+ }
+
+ @Override
+ public void deserialize(byte[] bytes) {}
+
+ @Override
+ public boolean equals(Object o) {
+ return o != null && getClass() == o.getClass();
+ }
+}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java
index 8c81840a4f8..a79c4336708 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.udf.api.relational;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
@@ -81,6 +82,8 @@ public interface TableFunction extends SQLFunction {
* <li>A description of proper columns.
* <li>A map indicating which columns from the input table arguments
are required for the
* function to execute.
+ * <li>A TableFunctionExecutionInfo which stores all information
necessary to execute the
+ * table function.
* </ul>
* </ul>
*
@@ -91,13 +94,16 @@ public interface TableFunction extends SQLFunction {
*/
TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException;
+ TableFunctionHandle createTableFunctionHandle();
+
/**
* This method is used to obtain a {@link TableFunctionProcessorProvider}
that will be responsible
* for creating processors to handle the transformation of input data into
output table. The
* provider is initialized with the validated arguments.
*
- * @param arguments a map of argument names to their corresponding {@link
Argument} values
+ * @param tableFunctionHandle the object containing the execution
information, which is generated
+ * in the {@link TableFunction#analyze} process.
* @return a {@link TableFunctionProcessorProvider} for creating processors
*/
- TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument>
arguments);
+ TableFunctionProcessorProvider getProcessorProvider(TableFunctionHandle
tableFunctionHandle);
}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java
new file mode 100644
index 00000000000..da27eb22cd2
--- /dev/null
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.relational.table;
+
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class MapTableFunctionHandle implements TableFunctionHandle {
+ private static final Set<Class<?>> SUPPORT_VALUE_TYPE =
+ new HashSet<>(
+ Arrays.asList(
+ Integer.class, Long.class, Double.class, Float.class,
String.class, Boolean.class));
+ private final Map<String, Object> map = new HashMap<>();
+
+ public void addProperty(String key, Object value) {
+ if (!SUPPORT_VALUE_TYPE.contains(value.getClass())) {
+ throw new IllegalArgumentException("Unsupported value type.");
+ }
+ map.put(key, value);
+ }
+
+ public Object getProperty(String key) {
+ return map.get(key);
+ }
+
+ @Override
+ public byte[] serialize() {
+ ByteBuffer buffer = ByteBuffer.allocate(calculateSerializeSize());
+ buffer.putInt(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ byte[] bytes = entry.getKey().getBytes(StandardCharsets.UTF_8);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ if (entry.getValue() instanceof Long) {
+ buffer.put(Type.INT64.getType());
+ buffer.putLong((Long) entry.getValue());
+ } else if (entry.getValue() instanceof Integer) {
+ buffer.put(Type.INT32.getType());
+ buffer.putInt((Integer) entry.getValue());
+ } else if (entry.getValue() instanceof Double) {
+ buffer.put(Type.DOUBLE.getType());
+ buffer.putDouble((Double) entry.getValue());
+ } else if (entry.getValue() instanceof Float) {
+ buffer.put(Type.FLOAT.getType());
+ buffer.putFloat((Float) entry.getValue());
+ } else if (entry.getValue() instanceof Boolean) {
+ buffer.put(Type.BOOLEAN.getType());
+ buffer.put(Boolean.TRUE.equals(entry.getValue()) ? (byte) 1 : (byte)
0);
+ } else if (entry.getValue() instanceof String) {
+ buffer.put(Type.STRING.getType());
+ bytes = ((String) entry.getValue()).getBytes(StandardCharsets.UTF_8);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
+ }
+ }
+ return buffer.array();
+ }
+
+ private int calculateSerializeSize() {
+ int size = Integer.SIZE;
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ size += Integer.BYTES +
entry.getKey().getBytes(StandardCharsets.UTF_8).length + Byte.BYTES;
+ if (entry.getValue() instanceof Long) {
+ size += Long.BYTES;
+ } else if (entry.getValue() instanceof Integer) {
+ size += Integer.BYTES;
+ } else if (entry.getValue() instanceof Double) {
+ size += Double.BYTES;
+ } else if (entry.getValue() instanceof Float) {
+ size += Float.BYTES;
+ } else if (entry.getValue() instanceof Boolean) {
+ size += Byte.BYTES;
+ } else if (entry.getValue() instanceof String) {
+ byte[] bytes = ((String)
entry.getValue()).getBytes(StandardCharsets.UTF_8);
+ size += Integer.BYTES + bytes.length;
+ }
+ }
+ return size;
+ }
+
+ @Override
+ public void deserialize(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ int size = buffer.getInt();
+ for (int i = 0; i < size; i++) {
+ byte[] b = new byte[buffer.getInt()];
+ buffer.get(b);
+ String key = new String(b, StandardCharsets.UTF_8);
+ Type type = Type.valueOf(buffer.get());
+ switch (type) {
+ case BOOLEAN:
+ map.put(key, buffer.get() != 0);
+ break;
+ case INT32:
+ map.put(key, buffer.getInt());
+ break;
+ case INT64:
+ map.put(key, buffer.getLong());
+ break;
+ case FLOAT:
+ map.put(key, buffer.getFloat());
+ break;
+ case DOUBLE:
+ map.put(key, buffer.getDouble());
+ break;
+ case STRING:
+ b = new byte[buffer.getInt()];
+ buffer.get(b);
+ map.put(key, new String(b, StandardCharsets.UTF_8));
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown type: " + type);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("MapTableFunctionHandle{");
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",
");
+ }
+ if (sb.length() > 2) {
+ sb.setLength(sb.length() - 2); // remove last comma and space
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MapTableFunctionHandle handle = (MapTableFunctionHandle) o;
+ return Objects.equals(map, handle.map);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(map);
+ }
+
+ public static class Builder {
+ private final Map<String, Object> map = new HashMap<>();
+
+ public Builder addProperty(String key, Object value) {
+ if (!SUPPORT_VALUE_TYPE.contains(value.getClass())) {
+ throw new IllegalArgumentException("Unsupported value type.");
+ }
+ map.put(key, value);
+ return this;
+ }
+
+ public MapTableFunctionHandle build() {
+ MapTableFunctionHandle handle = new MapTableFunctionHandle();
+ handle.map.putAll(map);
+ return handle;
+ }
+ }
+}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java
index bff404c1623..630fa814fca 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java
@@ -58,13 +58,17 @@ public class TableFunctionAnalysis {
*/
private final boolean requireRecordSnapshot;
+ private final TableFunctionHandle handle;
+
private TableFunctionAnalysis(
Optional<DescribedSchema> properColumnSchema,
Map<String, List<Integer>> requiredColumns,
- boolean requiredRecordSnapshot) {
+ boolean requiredRecordSnapshot,
+ TableFunctionHandle handle) {
this.properColumnSchema = requireNonNull(properColumnSchema, "returnedType
is null");
this.requiredColumns = requiredColumns;
this.requireRecordSnapshot = requiredRecordSnapshot;
+ this.handle = requireNonNull(handle, "TableFunctionHandle is null");
}
public Optional<DescribedSchema> getProperColumnSchema() {
@@ -79,6 +83,10 @@ public class TableFunctionAnalysis {
return requireRecordSnapshot;
}
+ public TableFunctionHandle getTableFunctionHandle() {
+ return handle;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -87,6 +95,7 @@ public class TableFunctionAnalysis {
private DescribedSchema properColumnSchema;
private final Map<String, List<Integer>> requiredColumns = new HashMap<>();
private boolean requireRecordSnapshot = true;
+ private TableFunctionHandle executionInfo;
private Builder() {}
@@ -105,9 +114,17 @@ public class TableFunctionAnalysis {
return this;
}
+ public Builder handle(TableFunctionHandle executionInfo) {
+ this.executionInfo = executionInfo;
+ return this;
+ }
+
public TableFunctionAnalysis build() {
return new TableFunctionAnalysis(
- Optional.ofNullable(properColumnSchema), requiredColumns,
requireRecordSnapshot);
+ Optional.ofNullable(properColumnSchema),
+ requiredColumns,
+ requireRecordSnapshot,
+ executionInfo);
}
}
}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java
new file mode 100644
index 00000000000..86f85688f2a
--- /dev/null
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.relational.table;
+
+/**
+ * An area to store all information necessary to execute the table function,
gathered at analysis
+ * time
+ */
+public interface TableFunctionHandle {
+ /**
+ * Serialize your state into byte array. The order of serialization must be
consistent with
+ * deserialization.
+ */
+ byte[] serialize();
+
+ /**
+ * Deserialize byte array into your state. The order of deserialization must
be consistent with
+ * serialization.
+ */
+ void deserialize(byte[] bytes);
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index e98259a353a..7642ac0f5bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -2862,7 +2862,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
TableFunctionProcessorNode node, LocalExecutionPlanContext context) {
TableFunction tableFunction = metadata.getTableFunction(node.getName());
TableFunctionProcessorProvider processorProvider =
- tableFunction.getProcessorProvider(node.getArguments());
+ tableFunction.getProcessorProvider(node.getTableFunctionHandle());
if (node.getChildren().isEmpty()) {
List<TSDataType> outputDataTypes =
node.getOutputSymbols().stream()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index d9eef926e92..16f2f45930f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -82,9 +82,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctio
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
-import org.apache.iotdb.udf.api.relational.table.argument.Argument;
-import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
-import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
import com.google.common.base.Joiner;
import org.apache.commons.lang3.Validate;
@@ -99,7 +96,6 @@ import java.util.Map.Entry;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
-import static java.lang.String.format;
import static org.apache.iotdb.db.utils.DateTimeUtils.TIMESTAMP_PRECISION;
public class PlanGraphPrinter extends PlanVisitor<List<String>,
PlanGraphPrinter.GraphContext> {
@@ -1045,34 +1041,10 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
.getOrderingScheme()
.ifPresent(orderingScheme -> boxValue.add("Order by: " +
orderingScheme));
});
- if (!node.getArguments().isEmpty()) {
- node.getArguments().forEach((key, value) ->
boxValue.add(formatArgument(key, value)));
- }
+ boxValue.add("TableFunctionHandle: " + node.getTableFunctionHandle());
return render(node, boxValue, context);
}
- private String formatArgument(String argumentName, Argument argument) {
- if (argument instanceof ScalarArgument) {
- return formatScalarArgument(argumentName, (ScalarArgument) argument);
- } else if (argument instanceof TableArgument) {
- return formatTableArgument(argumentName, (TableArgument) argument);
- } else {
- return argumentName + " => " + argument;
- }
- }
-
- private String formatScalarArgument(String argumentName, ScalarArgument
argument) {
- return format(
- "%s => ScalarArgument{type=%s, value=%s}",
- argumentName, argument.getType(), argument.getValue());
- }
-
- private String formatTableArgument(String argumentName, TableArgument
argument) {
- return format(
- "%s => TableArgument{%s}",
- argumentName, argument.isRowSemantics() ? "row semantics" : "set
semantics");
- }
-
private String printRegion(TRegionReplicaSet regionReplicaSet) {
return String.format(
"Partition: %s",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index 8c945bd72ab..a31f84f6d89 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -4140,6 +4140,7 @@ public class StatementAnalyzer {
new TableFunctionInvocationAnalysis(
node.getName().toString(),
argumentsAnalysis.getPassedArguments(),
+ functionAnalysis.getTableFunctionHandle(),
orderedTableArguments.build(),
requiredColumns,
properSchema.map(describedSchema ->
describedSchema.getFields().size()).orElse(0),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java
index 73900573fa9..1740ac2120a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import com.google.common.collect.ImmutableList;
@@ -29,6 +30,7 @@ import java.util.Map;
public class TableFunctionInvocationAnalysis {
private final String functionName;
private final Map<String, Argument> passedArguments;
+ private final TableFunctionHandle tableFunctionHandle;
private final List<TableArgumentAnalysis> tableArgumentAnalyses;
private final Map<String, List<Integer>> requiredColumns;
private final int properColumnsCount;
@@ -37,12 +39,14 @@ public class TableFunctionInvocationAnalysis {
public TableFunctionInvocationAnalysis(
String name,
Map<String, Argument> passedArguments,
+ TableFunctionHandle tableFunctionHandle,
ImmutableList<TableArgumentAnalysis> tableArgumentAnalyses,
Map<String, List<Integer>> requiredColumns,
int properColumnsCount,
boolean requiredRecordSnapshot) {
this.functionName = name;
this.passedArguments = passedArguments;
+ this.tableFunctionHandle = tableFunctionHandle;
this.tableArgumentAnalyses = tableArgumentAnalyses;
this.requiredColumns = requiredColumns;
this.properColumnsCount = properColumnsCount;
@@ -65,6 +69,10 @@ public class TableFunctionInvocationAnalysis {
return passedArguments;
}
+ public TableFunctionHandle getTableFunctionHandle() {
+ return tableFunctionHandle;
+ }
+
public int getProperColumnsCount() {
return properColumnsCount;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 4d14ebaf06b..8a726286015 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -918,7 +918,7 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
new TableFunctionNode(
idAllocator.genPlanNodeId(),
functionAnalysis.getFunctionName(),
- functionAnalysis.getPassedArguments(),
+ functionAnalysis.getTableFunctionHandle(),
properOutputs,
sources.build(),
sourceProperties.build());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
index e1079d9d5b3..87df600fe01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java
@@ -73,7 +73,7 @@ public class ImplementTableFunctionSource implements
Rule<TableFunctionNode> {
ImmutableList.of(),
Optional.empty(),
false,
- node.getArguments(),
+ node.getTableFunctionHandle(),
false));
} else if (node.getChildren().size() == 1) {
// Single source does not require pre-processing.
@@ -125,7 +125,7 @@ public class ImplementTableFunctionSource implements
Rule<TableFunctionNode> {
sourceProperties.getRequiredColumns(),
sourceProperties.getDataOrganizationSpecification(),
sourceProperties.isRowSemantics(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
sourceProperties.isRequireRecordSnapshot()));
} else {
// we don't support multiple source now.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java
index 60792dad57d..1886ed39e85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java
@@ -82,7 +82,7 @@ public class PruneTableFunctionProcessorColumns
node.getRequiredSymbols(),
node.getDataOrganizationSpecification(),
node.isRowSemantic(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
node.isRequireRecordSnapshot()));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java
index 6de6efb55fb..e9fd2e9db00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java
@@ -91,7 +91,7 @@ public class PruneTableFunctionProcessorSourceColumns
implements Rule<TableFunct
node.getRequiredSymbols(),
node.getDataOrganizationSpecification(),
node.isRowSemantic(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
node.isRequireRecordSnapshot())))
.orElse(Result.empty());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java
index e1326d97a9c..a4db3d68d2f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java
@@ -24,12 +24,12 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
-import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -47,20 +46,20 @@ import static java.util.Objects.requireNonNull;
public class TableFunctionNode extends MultiChildProcessNode {
private final String name;
- private final Map<String, Argument> arguments;
+ private final TableFunctionHandle tableFunctionHandle;
private final List<Symbol> properOutputs;
private final List<TableArgumentProperties> tableArgumentProperties;
public TableFunctionNode(
PlanNodeId id,
String name,
- Map<String, Argument> arguments,
+ TableFunctionHandle tableFunctionHandle,
List<Symbol> properOutputs,
List<PlanNode> children,
List<TableArgumentProperties> tableArgumentProperties) {
super(id, children);
this.name = requireNonNull(name, "name is null");
- this.arguments = ImmutableMap.copyOf(arguments);
+ this.tableFunctionHandle = tableFunctionHandle;
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.tableArgumentProperties =
ImmutableList.copyOf(tableArgumentProperties);
}
@@ -68,12 +67,12 @@ public class TableFunctionNode extends
MultiChildProcessNode {
public TableFunctionNode(
PlanNodeId id,
String name,
- Map<String, Argument> arguments,
+ TableFunctionHandle tableFunctionHandle,
List<Symbol> properOutputs,
List<TableArgumentProperties> tableArgumentProperties) {
super(id);
this.name = requireNonNull(name, "name is null");
- this.arguments = ImmutableMap.copyOf(arguments);
+ this.tableFunctionHandle = tableFunctionHandle;
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.tableArgumentProperties =
ImmutableList.copyOf(tableArgumentProperties);
}
@@ -82,8 +81,8 @@ public class TableFunctionNode extends MultiChildProcessNode {
return name;
}
- public Map<String, Argument> getArguments() {
- return arguments;
+ public TableFunctionHandle getTableFunctionHandle() {
+ return tableFunctionHandle;
}
public List<Symbol> getProperOutputs() {
@@ -96,7 +95,8 @@ public class TableFunctionNode extends MultiChildProcessNode {
@Override
public PlanNode clone() {
- return new TableFunctionNode(id, name, arguments, properOutputs,
tableArgumentProperties);
+ return new TableFunctionNode(
+ id, name, tableFunctionHandle, properOutputs, tableArgumentProperties);
}
@Override
@@ -130,18 +130,21 @@ public class TableFunctionNode extends
MultiChildProcessNode {
public PlanNode replaceChildren(List<PlanNode> newSources) {
checkArgument(children.size() == newSources.size(), "wrong number of new
children");
return new TableFunctionNode(
- getPlanNodeId(), name, arguments, properOutputs, newSources,
tableArgumentProperties);
+ getPlanNodeId(),
+ name,
+ tableFunctionHandle,
+ properOutputs,
+ newSources,
+ tableArgumentProperties);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TABLE_FUNCTION_NODE.serialize(byteBuffer);
ReadWriteIOUtils.write(name, byteBuffer);
- ReadWriteIOUtils.write(arguments.size(), byteBuffer);
- for (Map.Entry<String, Argument> entry : arguments.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
- entry.getValue().serialize(byteBuffer);
- }
+ byte[] bytes = tableFunctionHandle.serialize();
+ ReadWriteIOUtils.write(bytes.length, byteBuffer);
+ ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer);
ReadWriteIOUtils.write(properOutputs.size(), byteBuffer);
properOutputs.forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
ReadWriteIOUtils.write(tableArgumentProperties.size(), byteBuffer);
@@ -165,11 +168,9 @@ public class TableFunctionNode extends
MultiChildProcessNode {
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.TABLE_FUNCTION_NODE.serialize(stream);
ReadWriteIOUtils.write(name, stream);
- ReadWriteIOUtils.write(arguments.size(), stream);
- for (Map.Entry<String, Argument> entry : arguments.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), stream);
- entry.getValue().serialize(stream);
- }
+ byte[] bytes = tableFunctionHandle.serialize();
+ ReadWriteIOUtils.write(bytes.length, stream);
+ ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream);
ReadWriteIOUtils.write(properOutputs.size(), stream);
for (Symbol symbol : properOutputs) {
Symbol.serialize(symbol, stream);
@@ -194,12 +195,10 @@ public class TableFunctionNode extends
MultiChildProcessNode {
public static TableFunctionNode deserialize(ByteBuffer byteBuffer) {
String name = ReadWriteIOUtils.readString(byteBuffer);
int size = ReadWriteIOUtils.readInt(byteBuffer);
- ImmutableMap.Builder<String, Argument> arguments = ImmutableMap.builder();
- for (int i = 0; i < size; i++) {
- String key = ReadWriteIOUtils.readString(byteBuffer);
- Argument value = Argument.deserialize(byteBuffer);
- arguments.put(key, value);
- }
+ byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size);
+ TableFunctionHandle tableFunctionHandle =
+ new
TableMetadataImpl().getTableFunction(name).createTableFunctionHandle();
+ tableFunctionHandle.deserialize(bytes);
size = ReadWriteIOUtils.readInt(byteBuffer);
ImmutableList.Builder<Symbol> properOutputs = ImmutableList.builder();
for (int i = 0; i < size; i++) {
@@ -237,7 +236,7 @@ public class TableFunctionNode extends
MultiChildProcessNode {
return new TableFunctionNode(
planNodeId,
name,
- arguments.build(),
+ tableFunctionHandle,
properOutputs.build(),
tableArgumentProperties.build());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
index 57184818de7..2ffb128b66b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java
@@ -24,21 +24,19 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
-import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -63,7 +61,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
private final boolean rowSemantic;
- private final Map<String, Argument> arguments;
+ private final TableFunctionHandle tableFunctionHandle;
private final boolean requireRecordSnapshot;
@@ -76,7 +74,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
List<Symbol> requiredSymbols,
Optional<DataOrganizationSpecification> dataOrganizationSpecification,
boolean rowSemantic,
- Map<String, Argument> arguments,
+ TableFunctionHandle tableFunctionHandle,
boolean requireRecordSnapshot) {
super(id, source.orElse(null));
this.name = requireNonNull(name, "name is null");
@@ -86,7 +84,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
this.dataOrganizationSpecification =
requireNonNull(dataOrganizationSpecification, "specification is null");
this.rowSemantic = rowSemantic;
- this.arguments = ImmutableMap.copyOf(arguments);
+ this.tableFunctionHandle = tableFunctionHandle;
this.requireRecordSnapshot = requireRecordSnapshot;
}
@@ -98,7 +96,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
List<Symbol> requiredSymbols,
Optional<DataOrganizationSpecification> dataOrganizationSpecification,
boolean rowSemantic,
- Map<String, Argument> arguments,
+ TableFunctionHandle tableFunctionHandle,
boolean requireRecordSnapshot) {
super(id);
this.name = requireNonNull(name, "name is null");
@@ -108,7 +106,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
this.dataOrganizationSpecification =
requireNonNull(dataOrganizationSpecification, "specification is null");
this.rowSemantic = rowSemantic;
- this.arguments = ImmutableMap.copyOf(arguments);
+ this.tableFunctionHandle = tableFunctionHandle;
this.requireRecordSnapshot = requireRecordSnapshot;
}
@@ -136,8 +134,8 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
return dataOrganizationSpecification;
}
- public Map<String, Argument> getArguments() {
- return arguments;
+ public TableFunctionHandle getTableFunctionHandle() {
+ return tableFunctionHandle;
}
public boolean isRequireRecordSnapshot() {
@@ -154,7 +152,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
requiredSymbols,
dataOrganizationSpecification,
rowSemantic,
- arguments,
+ tableFunctionHandle,
requireRecordSnapshot);
}
@@ -204,12 +202,8 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
dataOrganizationSpecification.get().serialize(byteBuffer);
}
ReadWriteIOUtils.write(rowSemantic, byteBuffer);
- ReadWriteIOUtils.write(arguments.size(), byteBuffer);
- arguments.forEach(
- (key, value) -> {
- ReadWriteIOUtils.write(key, byteBuffer);
- value.serialize(byteBuffer);
- });
+ byte[] bytes = tableFunctionHandle.serialize();
+ ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer);
ReadWriteIOUtils.write(requireRecordSnapshot, byteBuffer);
}
@@ -234,11 +228,8 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
dataOrganizationSpecification.get().serialize(stream);
}
ReadWriteIOUtils.write(rowSemantic, stream);
- ReadWriteIOUtils.write(arguments.size(), stream);
- for (Map.Entry<String, Argument> entry : arguments.entrySet()) {
- ReadWriteIOUtils.write(entry.getKey(), stream);
- entry.getValue().serialize(stream);
- }
+ byte[] bytes = tableFunctionHandle.serialize();
+ ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream);
ReadWriteIOUtils.write(requireRecordSnapshot, stream);
}
@@ -266,12 +257,10 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
: Optional.empty();
boolean rowSemantic = ReadWriteIOUtils.readBoolean(byteBuffer);
size = ReadWriteIOUtils.readInt(byteBuffer);
- Map<String, Argument> arguments = new HashMap<>(size);
- while (size-- > 0) {
- String key = ReadWriteIOUtils.readString(byteBuffer);
- Argument value = Argument.deserialize(byteBuffer);
- arguments.put(key, value);
- }
+ byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size);
+ TableFunctionHandle tableFunctionHandle =
+ new
TableMetadataImpl().getTableFunction(name).createTableFunctionHandle();
+ tableFunctionHandle.deserialize(bytes);
boolean requireRecordSnapshot = ReadWriteIOUtils.readBoolean(byteBuffer);
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
@@ -283,7 +272,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
requiredSymbols,
dataOrganizationSpecification,
rowSemantic,
- arguments,
+ tableFunctionHandle,
requireRecordSnapshot);
}
@@ -300,7 +289,7 @@ public class TableFunctionProcessorNode extends
SingleChildProcessNode {
requiredSymbols,
dataOrganizationSpecification,
rowSemantic,
- arguments,
+ tableFunctionHandle,
requireRecordSnapshot);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index d12db4d3034..b9ac030a8b8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -870,7 +870,7 @@ public class UnaliasSymbolReferences implements
PlanOptimizer {
new TableFunctionNode(
node.getPlanNodeId(),
node.getName(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
newProperOutputs,
newSources.build(),
newTableArgumentProperties.build()),
@@ -893,7 +893,7 @@ public class UnaliasSymbolReferences implements
PlanOptimizer {
ImmutableList.of(),
Optional.empty(),
node.isRowSemantic(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
node.isRequireRecordSnapshot()),
mapping);
}
@@ -930,7 +930,7 @@ public class UnaliasSymbolReferences implements
PlanOptimizer {
newRequiredSymbols,
newSpecification,
node.isRowSemantic(),
- node.getArguments(),
+ node.getTableFunctionHandle(),
node.isRequireRecordSnapshot());
return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java
index 8fd86f684bf..bc5289da14a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java
@@ -20,8 +20,10 @@
package org.apache.iotdb.db.queryengine.plan.function;
import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -66,11 +68,18 @@ public class Exclude implements TableFunction {
return TableFunctionAnalysis.builder()
.properColumnSchema(schemaBuilder.build())
.requiredColumns(TBL_PARAM, requiredColumns)
+ .handle(new EmptyTableFunctionHandle())
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new EmptyTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java
index 19a4fe3cecc..ea56adfaef1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java
@@ -23,7 +23,9 @@ import
org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -62,22 +64,32 @@ public class Repeat implements TableFunction {
throw new UDFArgumentNotValidException(
"count argument for function repeat() must be positive");
}
+
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder().addProperty(N_PARAM,
count.getValue()).build();
return TableFunctionAnalysis.builder()
.properColumnSchema(DescribedSchema.builder().addField("repeat_index",
Type.INT32).build())
.requiredColumns(
TBL_PARAM,
Collections.singletonList(0)) // per spec, function must require
at least one column
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- ScalarArgument count = (ScalarArgument) arguments.get("N");
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TableFunctionDataProcessor() {
- private final int n = (int) count.getValue();
+ private final int n =
+ (int) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(N_PARAM);
private long recordIndex = 0;
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java
index fede9cae8a7..931a64d89ad 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.queryengine.plan.function;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -57,17 +59,34 @@ public class Split implements TableFunction {
@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
DescribedSchema schema = DescribedSchema.builder().addField("output",
Type.STRING).build();
- return TableFunctionAnalysis.builder().properColumnSchema(schema).build();
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ INPUT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SPLIT_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue())
+ .build();
+ return
TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionLeafProcessor getSplitProcessor() {
return new SplitProcessor(
- (String) ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue(),
- (String) ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue());
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME),
+ (String)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME));
}
};
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
index 71854d8b5be..3aa3d85fa80 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.TableFunctionProcessorMatcher;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -44,10 +46,8 @@ import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.specification;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableFunctionProcessor;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
-import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.TableFunctionProcessorMatcher.TableArgumentValue.Builder.tableArgument;
public class TableFunctionTest {
@@ -73,16 +73,12 @@ public class TableFunctionTest {
.name("hop")
.properOutputs("window_start", "window_end")
.requiredSymbols("time")
- .addScalarArgument("TIMECOL", "time")
- .addScalarArgument("SIZE", 3600000L)
- .addScalarArgument("SLIDE", 1800000L)
- .addScalarArgument("ORIGIN", 0L)
- .addTableArgument(
- "DATA",
- tableArgument()
- .rowSemantics()
- .passThroughSymbols(
- "time", "tag1", "tag2", "tag3", "attr1", "attr2",
"s1", "s2", "s3"));
+ .handle(
+ new MapTableFunctionHandle.Builder()
+ .addProperty("SIZE", 3600000L)
+ .addProperty("SLIDE", 1800000L)
+ .addProperty("ORIGIN", 0L)
+ .build());
// Verify full LogicalPlan
// Output - TableFunctionProcessor - TableScan
assertPlan(logicalQueryPlan,
anyTree(tableFunctionProcessor(tableFunctionMatcher, tableScan)));
@@ -129,14 +125,7 @@ public class TableFunctionTest {
.properOutputs("time", "tag1", "tag2", "tag3", "attr2", "s1",
"s2", "s3")
.requiredSymbols(
"time_0", "tag1_1", "tag2_2", "tag3_3", "attr2_4", "s1_5",
"s2_6", "s3_7")
- .addScalarArgument("EXCLUDE", "attr1")
- .addTableArgument(
- "DATA",
- tableArgument()
- .specification(
- specification(
- ImmutableList.of(), ImmutableList.of(),
ImmutableMap.of()))
- .rowSemantics());
+ .handle(new EmptyTableFunctionHandle());
// Verify full LogicalPlan
// Output - TableFunctionProcessor - TableScan
assertPlan(logicalQueryPlan,
anyTree(tableFunctionProcessor(tableFunctionMatcher, tableScan)));
@@ -185,17 +174,7 @@ public class TableFunctionTest {
.name("repeat")
.properOutputs("repeat_index")
.requiredSymbols("time")
- .addScalarArgument("N", 2)
- .addTableArgument(
- "DATA",
- tableArgument()
- .specification(
- specification(
- ImmutableList.of("tag1", "tag2", "tag3"),
- ImmutableList.of(),
- ImmutableMap.of()))
- .passThroughSymbols(
- "time", "tag1", "tag2", "tag3", "attr1", "attr2",
"s1", "s2", "s3"));
+ .handle(new MapTableFunctionHandle.Builder().addProperty("N",
2).build());
// Verify full LogicalPlan
// Output - TableFunctionProcessor - GroupNode - TableScan
assertPlan(
@@ -241,7 +220,11 @@ public class TableFunctionTest {
.name("split")
.properOutputs("output")
.requiredSymbols()
- .addScalarArgument("INPUT", "1,2,3,4,5");
+ .handle(
+ new MapTableFunctionHandle.Builder()
+ .addProperty("INPUT", "1,2,3,4,5")
+ .addProperty("SPLIT", ",")
+ .build());
// Verify full LogicalPlan
// Output - TableFunctionProcessor - TableScan
assertPlan(logicalQueryPlan,
anyTree(tableFunctionProcessor(tableFunctionMatcher)));
@@ -255,14 +238,22 @@ public class TableFunctionTest {
.name("split")
.properOutputs("output")
.requiredSymbols()
- .addScalarArgument("INPUT", "1,2,4,5");
+ .handle(
+ new MapTableFunctionHandle.Builder()
+ .addProperty("INPUT", "1,2,4,5")
+ .addProperty("SPLIT", ",")
+ .build());
Consumer<TableFunctionProcessorMatcher.Builder> tableFunctionMatcher2 =
builder ->
builder
.name("split")
.properOutputs("output_0")
.requiredSymbols()
- .addScalarArgument("INPUT", "2,3,4");
+ .handle(
+ new MapTableFunctionHandle.Builder()
+ .addProperty("INPUT", "2,3,4")
+ .addProperty("SPLIT", ",")
+ .build());
// Verify full LogicalPlan
// Output - TableFunctionProcessor - TableScan
assertPlan(
@@ -301,21 +292,19 @@ public class TableFunctionTest {
.properOutputs("time", "tag1", "tag2", "tag3", "attr2", "s1",
"s2", "s3")
.requiredSymbols(
"time_0", "tag1_1", "tag2_2", "tag3_3", "attr2_4", "s1_5",
"s2_6", "s3_7")
- .addScalarArgument("EXCLUDE", "attr1")
- .addTableArgument("DATA", tableArgument().rowSemantics());
+ .handle(new EmptyTableFunctionHandle());
Consumer<TableFunctionProcessorMatcher.Builder> hopMatcher =
builder ->
builder
.name("hop")
.properOutputs("window_start", "window_end")
.requiredSymbols("time")
- .addScalarArgument("TIMECOL", "time")
- .addScalarArgument("SIZE", 3600000L)
- .addScalarArgument("SLIDE", 1800000L)
- .addScalarArgument("ORIGIN", 0L)
- .addTableArgument(
- "DATA",
- tableArgument().rowSemantics().passThroughSymbols("tag1",
"tag2", "tag3"));
+ .handle(
+ new MapTableFunctionHandle.Builder()
+ .addProperty("SIZE", 3600000L)
+ .addProperty("SLIDE", 1800000L)
+ .addProperty("ORIGIN", 0L)
+ .build());
// Verify full LogicalPlan
// Output - Aggregation - HOP - Project - EXCLUDE - TableScan
assertPlan(
@@ -326,4 +315,21 @@ public class TableFunctionTest {
tableFunctionProcessor(
hopMatcher, project(tableFunctionProcessor(excludeMatcher,
tableScan))))));
}
+
+ @Test
+ public void testSerDeserializeMapTableFunctionHandle() {
+ MapTableFunctionHandle mapTableFunctionHandle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty("key1", "value1")
+ .addProperty("key2", 2)
+ .addProperty("key3", 1L)
+ .addProperty("key4", 3.0)
+ .addProperty("key5", true)
+ .addProperty("key6", 2.3f)
+ .build();
+ byte[] serialized = mapTableFunctionHandle.serialize();
+ MapTableFunctionHandle deserialized = new MapTableFunctionHandle();
+ deserialized.deserialize(serialized);
+ assert mapTableFunctionHandle.equals(deserialized);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java
index 52d553368f7..c0c534f494d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java
@@ -22,29 +22,19 @@ package
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
-import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
-import org.apache.iotdb.udf.api.relational.table.argument.Argument;
-import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
-import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
-import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.NO_MATCH;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.match;
@@ -54,17 +44,17 @@ public class TableFunctionProcessorMatcher implements
Matcher {
private final String name;
private final List<String> properOutputs;
private final List<String> requiredSymbols;
- private final Map<String, ArgumentValue> arguments;
+ private final TableFunctionHandle handle;
private TableFunctionProcessorMatcher(
String name,
List<String> properOutputs,
List<String> requiredSymbols,
- Map<String, ArgumentValue> arguments) {
+ TableFunctionHandle handle) {
this.name = requireNonNull(name, "name is null");
this.properOutputs = ImmutableList.copyOf(properOutputs);
this.requiredSymbols = ImmutableList.copyOf(requiredSymbols);
- this.arguments = ImmutableMap.copyOf(arguments);
+ this.handle = handle;
}
@Override
@@ -96,67 +86,8 @@ public class TableFunctionProcessorMatcher implements
Matcher {
if (!expectedRequired.equals(actualRequired)) {
return NO_MATCH;
}
- for (Map.Entry<String, ArgumentValue> entry : arguments.entrySet()) {
- String argumentName = entry.getKey();
- Argument actual =
tableFunctionProcessorNode.getArguments().get(argumentName);
- if (actual == null) {
- return NO_MATCH;
- }
- ArgumentValue expected = entry.getValue();
- if (expected instanceof ScalarArgumentValue) {
- if (!(actual instanceof ScalarArgument)) {
- return NO_MATCH;
- }
- ScalarArgumentValue expectedScalar = (ScalarArgumentValue) expected;
- ScalarArgument actualScalar = (ScalarArgument) actual;
- if (!Objects.equals(expectedScalar.value, actualScalar.getValue())) {
- return NO_MATCH;
- }
-
- } else {
- if (!(actual instanceof TableArgument)) {
- return NO_MATCH;
- }
- TableArgumentValue expectedTableArgument = (TableArgumentValue)
expected;
- TableArgument actualTableArgument = (TableArgument) actual;
- if (expectedTableArgument.rowSemantics !=
actualTableArgument.isRowSemantics()) {
- // check row semantic
- return NO_MATCH;
- }
- if (expectedTableArgument.passThroughColumns) {
- // check pass through columns
- Optional<TableFunctionNode.PassThroughSpecification>
passThroughSpecification =
- tableFunctionProcessorNode.getPassThroughSpecification();
- if (!passThroughSpecification.isPresent()
- || !passThroughSpecification.get().isDeclaredAsPassThrough()) {
- return NO_MATCH;
- }
- Set<SymbolReference> expectedPassThrough =
- expectedTableArgument.passThroughSymbol.stream()
- .map(symbolAliases::get)
- .collect(toImmutableSet());
- Set<SymbolReference> actualPassThrough =
- passThroughSpecification.get().getColumns().stream()
- .map(TableFunctionNode.PassThroughColumn::getSymbol)
- .map(Symbol::toSymbolReference)
- .collect(toImmutableSet());
-
- if (!expectedPassThrough.equals(actualPassThrough)) {
- return NO_MATCH;
- }
- }
- if (expectedTableArgument.specification.isPresent()
- &&
tableFunctionProcessorNode.getDataOrganizationSpecification().isPresent()) {
- // check data organization
- DataOrganizationSpecification expectedDataOrganization =
-
expectedTableArgument.specification.get().getExpectedValue(symbolAliases);
- DataOrganizationSpecification actualDataOrganization =
-
tableFunctionProcessorNode.getDataOrganizationSpecification().get();
- if (!expectedDataOrganization.equals(actualDataOrganization)) {
- return NO_MATCH;
- }
- }
- }
+ if (!handle.equals(tableFunctionProcessorNode.getTableFunctionHandle())) {
+ return NO_MATCH;
}
ImmutableMap.Builder<String, SymbolReference> properOutputsMapping =
ImmutableMap.builder();
for (int i = 0; i < properOutputs.size(); i++) {
@@ -179,17 +110,15 @@ public class TableFunctionProcessorMatcher implements
Matcher {
.add("name", name)
.add("properOutputs", properOutputs)
.add("requiredSymbols", requiredSymbols)
- .add("arguments", arguments)
+ .add("handle", handle)
.toString();
}
- public interface ArgumentValue {}
-
public static class Builder {
private String name;
private List<String> properOutputs = ImmutableList.of();
private List<String> requiredSymbols = ImmutableList.of();
- private final ImmutableMap.Builder<String, ArgumentValue> arguments =
ImmutableMap.builder();
+ private TableFunctionHandle handle;
public Builder name(String name) {
this.name = name;
@@ -206,98 +135,13 @@ public class TableFunctionProcessorMatcher implements
Matcher {
return this;
}
- public TableFunctionProcessorMatcher.Builder addScalarArgument(String
name, Object value) {
- this.arguments.put(name, new ScalarArgumentValue(value));
- return this;
- }
-
- public TableFunctionProcessorMatcher.Builder addTableArgument(
- String name, TableArgumentValue.Builder tableArgument) {
- this.arguments.put(name, tableArgument.build());
+ public Builder handle(TableFunctionHandle handle) {
+ this.handle = handle;
return this;
}
public TableFunctionProcessorMatcher build() {
- return new TableFunctionProcessorMatcher(
- name, properOutputs, requiredSymbols, arguments.build());
- }
- }
-
- public static class ScalarArgumentValue implements ArgumentValue {
- protected Object value;
-
- public ScalarArgumentValue(Object value) {
- this.value = value;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this).add("value", value).toString();
- }
- }
-
- public static class TableArgumentValue implements ArgumentValue {
- protected boolean rowSemantics;
- protected boolean passThroughColumns;
- protected Optional<ExpectedValueProvider<DataOrganizationSpecification>>
specification;
- protected Set<String> passThroughSymbol;
-
- public TableArgumentValue(
- boolean rowSemantics,
- boolean passThroughColumns,
- Optional<ExpectedValueProvider<DataOrganizationSpecification>>
specification,
- Set<String> passThroughSymbol) {
- this.rowSemantics = rowSemantics;
- this.passThroughColumns = passThroughColumns;
- this.specification = specification;
- this.passThroughSymbol = passThroughSymbol;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .omitNullValues()
- .add("rowSemantics", rowSemantics)
- .add("passThroughColumns", passThroughColumns)
- .add("specification", specification)
- .add("passThroughSymbol", passThroughSymbol)
- .toString();
- }
-
- public static class Builder {
- private boolean rowSemantics;
- private boolean passThroughColumns;
- private Optional<ExpectedValueProvider<DataOrganizationSpecification>>
specification =
- Optional.empty();
- private Set<String> passThroughSymbols = ImmutableSet.of();
-
- private Builder() {}
-
- public static Builder tableArgument() {
- return new Builder();
- }
-
- public Builder rowSemantics() {
- this.rowSemantics = true;
- return this;
- }
-
- public Builder specification(
- ExpectedValueProvider<DataOrganizationSpecification> specification) {
- this.specification = Optional.of(specification);
- return this;
- }
-
- public Builder passThroughSymbols(String... symbols) {
- this.passThroughColumns = true;
- this.passThroughSymbols = ImmutableSet.copyOf(symbols);
- return this;
- }
-
- private TableArgumentValue build() {
- return new TableArgumentValue(
- rowSemantics, passThroughColumns, specification,
passThroughSymbols);
- }
+ return new TableFunctionProcessorMatcher(name, properOutputs,
requiredSymbols, handle);
}
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
index 380a19be1a7..d26ba6d8376 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -60,33 +62,43 @@ public class CapacityTableFunction implements TableFunction
{
if (size <= 0) {
throw new UDFException("Size must be greater than 0");
}
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME,
size).build();
return TableFunctionAnalysis.builder()
.properColumnSchema(
new DescribedSchema.Builder().addField("window_index",
Type.INT64).build())
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- long sz = (long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ long sz =
+ (long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME);
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
- return new CountDataProcessor(sz);
+ return new CapacityDataProcessor(sz);
}
};
}
- private static class CountDataProcessor implements
TableFunctionDataProcessor {
+ private static class CapacityDataProcessor implements
TableFunctionDataProcessor {
private final long size;
private long currentStartIndex = 0;
private long curIndex = 0;
private long windowIndex = 0;
- public CountDataProcessor(long size) {
+ public CapacityDataProcessor(long size) {
this.size = size;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java
index aee0481cd24..b3290bf6cba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -103,24 +105,39 @@ public class CumulateTableFunction implements
TableFunction {
.addField("window_start", Type.TIMESTAMP)
.addField("window_end", Type.TIMESTAMP)
.build();
-
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(STEP_PARAMETER_NAME, step)
+ .addProperty(SIZE_PARAMETER_NAME, size)
+ .addProperty(
+ ORIGIN_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+ .build();
// outputColumnSchema
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME,
Collections.singletonList(requiredIndex))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new CumulateDataProcessor(
- (Long) ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue(),
- (Long) ((ScalarArgument)
arguments.get(STEP_PARAMETER_NAME)).getValue(),
- (Long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+ (Long)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME),
+ (Long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(STEP_PARAMETER_NAME),
+ (Long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME));
}
};
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
index f621d24e80b..d0b2852a51d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -93,24 +95,43 @@ public class HOPTableFunction implements TableFunction {
.addField("window_start", Type.TIMESTAMP)
.addField("window_end", Type.TIMESTAMP)
.build();
-
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ ORIGIN_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SLIDE_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SIZE_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue())
+ .build();
// outputColumnSchema
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME,
Collections.singletonList(requiredIndex))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ MapTableFunctionHandle mapTableFunctionHandle = (MapTableFunctionHandle)
tableFunctionHandle;
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new HOPDataProcessor(
- (Long) ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue(),
- (Long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
- (Long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+ (Long) mapTableFunctionHandle.getProperty(ORIGIN_PARAMETER_NAME),
+ (Long) mapTableFunctionHandle.getProperty(SLIDE_PARAMETER_NAME),
+ (Long) mapTableFunctionHandle.getProperty(SIZE_PARAMETER_NAME));
}
};
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
index ca679b82b1f..d4e4c6ad15f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -75,17 +77,30 @@ public class SessionTableFunction implements TableFunction {
.addField("window_end", Type.TIMESTAMP)
.build();
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ GAP_PARAMETER_NAME, ((ScalarArgument)
arguments.get(GAP_PARAMETER_NAME)).getValue())
+ .build();
// outputColumnSchema
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME,
Collections.singletonList(requiredIndex))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- long gap = (long) ((ScalarArgument)
arguments.get(GAP_PARAMETER_NAME)).getValue();
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ long gap =
+ (long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(GAP_PARAMETER_NAME);
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java
index 3c7ec080cef..9aa8e2167ab 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -88,22 +90,39 @@ public class TumbleTableFunction implements TableFunction {
.addField("window_end", Type.TIMESTAMP)
.build();
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ ORIGIN_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+ .addProperty(
+ SIZE_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue())
+ .build();
// outputColumnSchema
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME,
Collections.singletonList(requiredIndex))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new TumbleDataProcessor(
- (Long) ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue(),
- (Long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+ (Long)
+ ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME),
+ (Long) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME));
}
};
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
index 263ad5452f7..284f623f301 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import org.apache.iotdb.udf.api.relational.table.argument.Argument;
import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
@@ -76,16 +78,30 @@ public class VariationTableFunction implements
TableFunction {
DescribedSchema properColumnSchema =
new DescribedSchema.Builder().addField("window_index",
Type.INT64).build();
// outputColumnSchema
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(
+ DELTA_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(DELTA_PARAMETER_NAME)).getValue())
+ .build();
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
.requireRecordSnapshot(false)
.requiredColumns(DATA_PARAMETER_NAME,
Collections.singletonList(requiredIndex))
+ .handle(handle)
.build();
}
@Override
- public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
- double delta = (double) ((ScalarArgument)
arguments.get(DELTA_PARAMETER_NAME)).getValue();
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ double delta =
+ (double) ((MapTableFunctionHandle)
tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME);
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {