This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new 05ee0005a0f [FLINK-38957][table-planner] Support
`ProcessTableFunction` registration via `CompiledPlan`
05ee0005a0f is described below
commit 05ee0005a0f7dc09ece607e09cd36617eaf15382
Author: Ferenc Csaky <[email protected]>
AuthorDate: Wed Feb 4 16:03:15 2026 +0100
[FLINK-38957][table-planner] Support `ProcessTableFunction` registration
via `CompiledPlan`
---
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 1 +
.../nodes/exec/serde/RexNodeJsonSerdeTest.java | 86 +++++++++++++++++++++-
2 files changed, 86 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index f06d2bfcfd4..f3854850623 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -472,6 +472,7 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
case SCALAR:
case ASYNC_SCALAR:
case TABLE:
+ case PROCESS_TABLE:
return BridgingSqlFunction.of(
serdeContext.getFlinkContext(),
serdeContext.getTypeFactory(),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 76b0066256b..ad8583a23a5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -113,6 +114,7 @@ public class RexNodeJsonSerdeTest {
RexNodeJsonSerdeTest.class.getClassLoader(),
FlinkTypeSystem.INSTANCE);
private static final String FUNCTION_NAME = "MyFunc";
private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
+ private static final String PROCESS_TABLE_FUNCTION_NAME =
"MyProcessTableFunc";
private static final FunctionIdentifier FUNCTION_SYS_ID =
FunctionIdentifier.of(FUNCTION_NAME);
private static final FunctionIdentifier FUNCTION_CAT_ID =
FunctionIdentifier.of(
@@ -120,23 +122,35 @@ public class RexNodeJsonSerdeTest {
private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
FunctionIdentifier.of(
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
ASYNC_FUNCTION_NAME));
+ private static final FunctionIdentifier PROCESS_TABLE_FUNCTION_CAT_ID =
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(
+ DEFAULT_CATALOG, DEFAULT_DATABASE,
PROCESS_TABLE_FUNCTION_NAME));
private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID
=
UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
+ private static final UnresolvedIdentifier
UNRESOLVED_PROCESS_TABLE_FUNCTION_CAT_ID =
+ UnresolvedIdentifier.of(PROCESS_TABLE_FUNCTION_CAT_ID.toList());
private static final SerializableScalarFunction SER_UDF_IMPL = new
SerializableScalarFunction();
private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
new SerializableAsyncScalarFunction();
+ private static final SerializableProcessTableFunction SER_PTF_UDF_IMPL =
+ new SerializableProcessTableFunction();
private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
SerializableScalarFunction.class;
private static final Class<SerializableAsyncScalarFunction>
SER_ASYNC_UDF_CLASS =
SerializableAsyncScalarFunction.class;
+ private static final Class<SerializableProcessTableFunction>
SER_PTF_UDF_CLASS =
+ SerializableProcessTableFunction.class;
private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
new OtherSerializableScalarFunction();
private static final Class<OtherSerializableScalarFunction>
SER_UDF_CLASS_OTHER =
OtherSerializableScalarFunction.class;
private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
new NonSerializableScalarFunction(true);
+ private static final NonSerializableProcessTableFunction
NON_SER_PTF_UDF_IMPL =
+ new NonSerializableProcessTableFunction(true);
private static final NonSerializableFunctionDefinition
NON_SER_FUNCTION_DEF_IMPL =
new NonSerializableFunctionDefinition();
private static final ContextResolvedFunction PERMANENT_FUNCTION =
@@ -180,7 +194,13 @@ public class RexNodeJsonSerdeTest {
serdeContext,
ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
RexNode.class);
- // Non-serializable function due to fields
+ // Serializable process table function
+ testJsonRoundTrip(
+ createFunctionCall(
+ serdeContext,
ContextResolvedFunction.anonymous(SER_PTF_UDF_IMPL)),
+ RexNode.class);
+
+ // Non-serializable scalar function due to fields
assertThatThrownBy(
() ->
toJson(
@@ -193,6 +213,20 @@ public class RexNodeJsonSerdeTest {
anyCauseMatches(
TableException.class,
"The function's implementation class must not
be stateful"));
+
+ // Non-serializable process table function due to fields
+ assertThatThrownBy(
+ () ->
+ toJson(
+ serdeContext,
+ createFunctionCall(
+ serdeContext,
+
ContextResolvedFunction.anonymous(
+
NON_SER_PTF_UDF_IMPL))))
+ .satisfies(
+ anyCauseMatches(
+ TableException.class,
+ "The function's implementation class must not
be stateful"));
}
@Test
@@ -863,6 +897,14 @@ public class RexNodeJsonSerdeTest {
UNRESOLVED_ASYNC_FUNCTION_CAT_ID,
FunctionDescriptor.forFunctionClass(SER_ASYNC_UDF_CLASS).build(),
false);
+ serdeContext
+ .getFlinkContext()
+ .getFunctionCatalog()
+ .registerCatalogFunction(
+ UNRESOLVED_PROCESS_TABLE_FUNCTION_CAT_ID,
+
FunctionDescriptor.forFunctionClass(SER_PTF_UDF_CLASS).build(),
+ false);
+
return serdeContext;
}
@@ -977,6 +1019,24 @@ public class RexNodeJsonSerdeTest {
}
}
+ /** Serializable process table function. */
+ public static class SerializableProcessTableFunction extends
ProcessTableFunction<String> {
+
+ @SuppressWarnings("unused")
+ public void eval(Integer i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ .typedArguments(DataTypes.INT())
+
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.STRING()))
+ .disableSystemArguments(true)
+ .build();
+ }
+ }
+
/** Non-serializable function. */
public static class NonSerializableScalarFunction extends ScalarFunction {
@SuppressWarnings({"FieldCanBeLocal", "unused"})
@@ -992,6 +1052,30 @@ public class RexNodeJsonSerdeTest {
}
}
+ /** Non-serializable process table function. */
+ public static class NonSerializableProcessTableFunction extends
ProcessTableFunction<String> {
+ @SuppressWarnings({"FieldCanBeLocal", "unused"})
+ private final boolean flag;
+
+ public NonSerializableProcessTableFunction(boolean flag) {
+ this.flag = flag;
+ }
+
+ @SuppressWarnings("unused")
+ public void eval(Integer i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return TypeInference.newBuilder()
+ .typedArguments(DataTypes.INT())
+
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.STRING()))
+ .disableSystemArguments(true)
+ .build();
+ }
+ }
+
/** Non-serializable function definition. */
public static class NonSerializableFunctionDefinition implements
FunctionDefinition {