This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new 05ee0005a0f [FLINK-38957][table-planner] Support 
`ProcessTableFunction` registration via `CompiledPlan`
05ee0005a0f is described below

commit 05ee0005a0f7dc09ece607e09cd36617eaf15382
Author: Ferenc Csaky <[email protected]>
AuthorDate: Wed Feb 4 16:03:15 2026 +0100

    [FLINK-38957][table-planner] Support `ProcessTableFunction` registration 
via `CompiledPlan`
---
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  1 +
 .../nodes/exec/serde/RexNodeJsonSerdeTest.java     | 86 +++++++++++++++++++++-
 2 files changed, 86 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index f06d2bfcfd4..f3854850623 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -472,6 +472,7 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
             case SCALAR:
             case ASYNC_SCALAR:
             case TABLE:
+            case PROCESS_TABLE:
                 return BridgingSqlFunction.of(
                         serdeContext.getFlinkContext(),
                         serdeContext.getTypeFactory(),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 76b0066256b..ad8583a23a5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.functions.AsyncScalarFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -113,6 +114,7 @@ public class RexNodeJsonSerdeTest {
                     RexNodeJsonSerdeTest.class.getClassLoader(), 
FlinkTypeSystem.INSTANCE);
     private static final String FUNCTION_NAME = "MyFunc";
     private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
+    private static final String PROCESS_TABLE_FUNCTION_NAME = 
"MyProcessTableFunc";
     private static final FunctionIdentifier FUNCTION_SYS_ID = 
FunctionIdentifier.of(FUNCTION_NAME);
     private static final FunctionIdentifier FUNCTION_CAT_ID =
             FunctionIdentifier.of(
@@ -120,23 +122,35 @@ public class RexNodeJsonSerdeTest {
     private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
             FunctionIdentifier.of(
                     ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
ASYNC_FUNCTION_NAME));
+    private static final FunctionIdentifier PROCESS_TABLE_FUNCTION_CAT_ID =
+            FunctionIdentifier.of(
+                    ObjectIdentifier.of(
+                            DEFAULT_CATALOG, DEFAULT_DATABASE, 
PROCESS_TABLE_FUNCTION_NAME));
     private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
             UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
     private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID 
=
             UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
+    private static final UnresolvedIdentifier 
UNRESOLVED_PROCESS_TABLE_FUNCTION_CAT_ID =
+            UnresolvedIdentifier.of(PROCESS_TABLE_FUNCTION_CAT_ID.toList());
     private static final SerializableScalarFunction SER_UDF_IMPL = new 
SerializableScalarFunction();
     private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
             new SerializableAsyncScalarFunction();
+    private static final SerializableProcessTableFunction SER_PTF_UDF_IMPL =
+            new SerializableProcessTableFunction();
     private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
             SerializableScalarFunction.class;
     private static final Class<SerializableAsyncScalarFunction> 
SER_ASYNC_UDF_CLASS =
             SerializableAsyncScalarFunction.class;
+    private static final Class<SerializableProcessTableFunction> 
SER_PTF_UDF_CLASS =
+            SerializableProcessTableFunction.class;
     private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
             new OtherSerializableScalarFunction();
     private static final Class<OtherSerializableScalarFunction> 
SER_UDF_CLASS_OTHER =
             OtherSerializableScalarFunction.class;
     private static final NonSerializableScalarFunction NON_SER_UDF_IMPL =
             new NonSerializableScalarFunction(true);
+    private static final NonSerializableProcessTableFunction 
NON_SER_PTF_UDF_IMPL =
+            new NonSerializableProcessTableFunction(true);
     private static final NonSerializableFunctionDefinition 
NON_SER_FUNCTION_DEF_IMPL =
             new NonSerializableFunctionDefinition();
     private static final ContextResolvedFunction PERMANENT_FUNCTION =
@@ -180,7 +194,13 @@ public class RexNodeJsonSerdeTest {
                         serdeContext, 
ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
                 RexNode.class);
 
-        // Non-serializable function due to fields
+        // Serializable process table function
+        testJsonRoundTrip(
+                createFunctionCall(
+                        serdeContext, 
ContextResolvedFunction.anonymous(SER_PTF_UDF_IMPL)),
+                RexNode.class);
+
+        // Non-serializable scalar function due to fields
         assertThatThrownBy(
                         () ->
                                 toJson(
@@ -193,6 +213,20 @@ public class RexNodeJsonSerdeTest {
                         anyCauseMatches(
                                 TableException.class,
                                 "The function's implementation class must not 
be stateful"));
+
+        // Non-serializable process table function due to fields
+        assertThatThrownBy(
+                        () ->
+                                toJson(
+                                        serdeContext,
+                                        createFunctionCall(
+                                                serdeContext,
+                                                
ContextResolvedFunction.anonymous(
+                                                        
NON_SER_PTF_UDF_IMPL))))
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "The function's implementation class must not 
be stateful"));
     }
 
     @Test
@@ -863,6 +897,14 @@ public class RexNodeJsonSerdeTest {
                         UNRESOLVED_ASYNC_FUNCTION_CAT_ID,
                         
FunctionDescriptor.forFunctionClass(SER_ASYNC_UDF_CLASS).build(),
                         false);
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .registerCatalogFunction(
+                        UNRESOLVED_PROCESS_TABLE_FUNCTION_CAT_ID,
+                        
FunctionDescriptor.forFunctionClass(SER_PTF_UDF_CLASS).build(),
+                        false);
+
         return serdeContext;
     }
 
@@ -977,6 +1019,24 @@ public class RexNodeJsonSerdeTest {
         }
     }
 
+    /** Serializable process table function. */
+    public static class SerializableProcessTableFunction extends 
ProcessTableFunction<String> {
+
+        @SuppressWarnings("unused")
+        public void eval(Integer i) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+            return TypeInference.newBuilder()
+                    .typedArguments(DataTypes.INT())
+                    
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.STRING()))
+                    .disableSystemArguments(true)
+                    .build();
+        }
+    }
+
     /** Non-serializable function. */
     public static class NonSerializableScalarFunction extends ScalarFunction {
         @SuppressWarnings({"FieldCanBeLocal", "unused"})
@@ -992,6 +1052,30 @@ public class RexNodeJsonSerdeTest {
         }
     }
 
+    /** Non-serializable process table function. */
+    public static class NonSerializableProcessTableFunction extends 
ProcessTableFunction<String> {
+        @SuppressWarnings({"FieldCanBeLocal", "unused"})
+        private final boolean flag;
+
+        public NonSerializableProcessTableFunction(boolean flag) {
+            this.flag = flag;
+        }
+
+        @SuppressWarnings("unused")
+        public void eval(Integer i) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+            return TypeInference.newBuilder()
+                    .typedArguments(DataTypes.INT())
+                    
.outputTypeStrategy(TypeStrategies.explicit(DataTypes.STRING()))
+                    .disableSystemArguments(true)
+                    .build();
+        }
+    }
+
     /** Non-serializable function definition. */
     public static class NonSerializableFunctionDefinition implements 
FunctionDefinition {
 

Reply via email to