This is an automated email from the ASF dual-hosted git repository. fcsaky pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push: new 5a8a085516d [FLINK-37820][table-planner] Support `AsyncScalarFunction` registration via `CompiledPlan` 5a8a085516d is described below commit 5a8a085516de889abd00cc074a1dff3bf5bd3607 Author: Ferenc Csaky <fcs...@apache.org> AuthorDate: Tue May 27 07:21:15 2025 +0200 [FLINK-37820][table-planner] Support `AsyncScalarFunction` registration via `CompiledPlan` --- .../nodes/exec/serde/RexNodeJsonDeserializer.java | 1 + .../nodes/exec/serde/RexNodeJsonSerdeTest.java | 32 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java index c5111219283..9966f501551 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java @@ -442,6 +442,7 @@ final class RexNodeJsonDeserializer extends StdDeserializer<RexNode> { switch (functionInstance.getKind()) { case SCALAR: + case ASYNC_SCALAR: case TABLE: return BridgingSqlFunction.of( serdeContext.getFlinkContext(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java index 489f10a6123..c8b618823d7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ContextResolvedFunction; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.AsyncScalarFunction; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.FunctionIdentifier; import org.apache.flink.table.functions.FunctionKind; @@ -80,6 +81,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -104,15 +106,25 @@ public class RexNodeJsonSerdeTest { new FlinkTypeFactory( RexNodeJsonSerdeTest.class.getClassLoader(), FlinkTypeSystem.INSTANCE); private static final String FUNCTION_NAME = "MyFunc"; + private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc"; private static final FunctionIdentifier FUNCTION_SYS_ID = FunctionIdentifier.of(FUNCTION_NAME); private static final FunctionIdentifier FUNCTION_CAT_ID = FunctionIdentifier.of( ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, FUNCTION_NAME)); + private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID = + FunctionIdentifier.of( + ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, ASYNC_FUNCTION_NAME)); private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID = UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList()); + private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID = + UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList()); private static final SerializableScalarFunction SER_UDF_IMPL = new SerializableScalarFunction(); + private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL = + new SerializableAsyncScalarFunction(); private static final Class<SerializableScalarFunction> SER_UDF_CLASS = SerializableScalarFunction.class; + private static final Class<SerializableAsyncScalarFunction> SER_ASYNC_UDF_CLASS = + SerializableAsyncScalarFunction.class; private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER = new OtherSerializableScalarFunction(); private static final Class<OtherSerializableScalarFunction> SER_UDF_CLASS_OTHER = @@ -140,6 +152,12 @@ public class RexNodeJsonSerdeTest { createFunctionCall(serdeContext, ContextResolvedFunction.anonymous(SER_UDF_IMPL)), RexNode.class); + // Serializable async function + testJsonRoundTrip( + createFunctionCall( + serdeContext, ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)), + RexNode.class); + // Non-serializable function due to fields assertThatThrownBy( () -> @@ -732,6 +750,11 @@ public class RexNodeJsonSerdeTest { .getFlinkContext() .getFunctionCatalog() .registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID, SER_UDF_CLASS, false); + serdeContext + .getFlinkContext() + .getFunctionCatalog() + .registerCatalogFunction( + UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS, false); return serdeContext; } @@ -834,6 +857,15 @@ public class RexNodeJsonSerdeTest { } } + /** Serializable async function. */ + public static class SerializableAsyncScalarFunction extends AsyncScalarFunction { + + @SuppressWarnings("unused") + public void eval(CompletableFuture<String> res, Integer i) { + throw new UnsupportedOperationException(); + } + } + /** Non-serializable function. */ public static class NonSerializableScalarFunction extends ScalarFunction { @SuppressWarnings({"FieldCanBeLocal", "unused"})