This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.0 by this push:
     new 5a8a085516d [FLINK-37820][table-planner] Support `AsyncScalarFunction` 
registration via `CompiledPlan`
5a8a085516d is described below

commit 5a8a085516de889abd00cc074a1dff3bf5bd3607
Author: Ferenc Csaky <fcs...@apache.org>
AuthorDate: Tue May 27 07:21:15 2025 +0200

    [FLINK-37820][table-planner] Support `AsyncScalarFunction` registration via 
`CompiledPlan`
---
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  1 +
 .../nodes/exec/serde/RexNodeJsonSerdeTest.java     | 32 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index c5111219283..9966f501551 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -442,6 +442,7 @@ final class RexNodeJsonDeserializer extends 
StdDeserializer<RexNode> {
 
         switch (functionInstance.getKind()) {
             case SCALAR:
+            case ASYNC_SCALAR:
             case TABLE:
                 return BridgingSqlFunction.of(
                         serdeContext.getFlinkContext(),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 489f10a6123..c8b618823d7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.AsyncScalarFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.FunctionKind;
@@ -80,6 +81,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -104,15 +106,25 @@ public class RexNodeJsonSerdeTest {
             new FlinkTypeFactory(
                     RexNodeJsonSerdeTest.class.getClassLoader(), 
FlinkTypeSystem.INSTANCE);
     private static final String FUNCTION_NAME = "MyFunc";
+    private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
     private static final FunctionIdentifier FUNCTION_SYS_ID = 
FunctionIdentifier.of(FUNCTION_NAME);
     private static final FunctionIdentifier FUNCTION_CAT_ID =
             FunctionIdentifier.of(
                     ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
FUNCTION_NAME));
+    private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
+            FunctionIdentifier.of(
+                    ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
ASYNC_FUNCTION_NAME));
     private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
             UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+    private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID 
=
+            UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
     private static final SerializableScalarFunction SER_UDF_IMPL = new 
SerializableScalarFunction();
+    private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
+            new SerializableAsyncScalarFunction();
     private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
             SerializableScalarFunction.class;
+    private static final Class<SerializableAsyncScalarFunction> 
SER_ASYNC_UDF_CLASS =
+            SerializableAsyncScalarFunction.class;
     private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
             new OtherSerializableScalarFunction();
     private static final Class<OtherSerializableScalarFunction> 
SER_UDF_CLASS_OTHER =
@@ -140,6 +152,12 @@ public class RexNodeJsonSerdeTest {
                 createFunctionCall(serdeContext, 
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
                 RexNode.class);
 
+        // Serializable async function
+        testJsonRoundTrip(
+                createFunctionCall(
+                        serdeContext, 
ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
+                RexNode.class);
+
         // Non-serializable function due to fields
         assertThatThrownBy(
                         () ->
@@ -732,6 +750,11 @@ public class RexNodeJsonSerdeTest {
                 .getFlinkContext()
                 .getFunctionCatalog()
                 .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, 
SER_UDF_CLASS, false);
+        serdeContext
+                .getFlinkContext()
+                .getFunctionCatalog()
+                .registerCatalogFunction(
+                        UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, 
false);
         return serdeContext;
     }
 
@@ -834,6 +857,15 @@ public class RexNodeJsonSerdeTest {
         }
     }
 
+    /** Serializable async function. */
+    public static class SerializableAsyncScalarFunction extends 
AsyncScalarFunction {
+
+        @SuppressWarnings("unused")
+        public void eval(CompletableFuture<String> res, Integer i) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
     /** Non-serializable function. */
     public static class NonSerializableScalarFunction extends ScalarFunction {
         @SuppressWarnings({"FieldCanBeLocal", "unused"})

Reply via email to