This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 4e1ac5a7980 [FLINK-37820][table-planner] Support `AsyncScalarFunction`
registration via `CompiledPlan`
4e1ac5a7980 is described below
commit 4e1ac5a7980cbeb4402cd9b4bb7ae788918b0860
Author: Ferenc Csaky <[email protected]>
AuthorDate: Tue May 27 07:21:38 2025 +0200
[FLINK-37820][table-planner] Support `AsyncScalarFunction` registration via
`CompiledPlan`
---
.../nodes/exec/serde/RexNodeJsonDeserializer.java | 1 +
.../nodes/exec/serde/RexNodeJsonSerdeTest.java | 32 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index c5111219283..9966f501551 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -442,6 +442,7 @@ final class RexNodeJsonDeserializer extends
StdDeserializer<RexNode> {
switch (functionInstance.getKind()) {
case SCALAR:
+ case ASYNC_SCALAR:
case TABLE:
return BridgingSqlFunction.of(
serdeContext.getFlinkContext(),
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
index 489f10a6123..c8b618823d7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
@@ -80,6 +81,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -104,15 +106,25 @@ public class RexNodeJsonSerdeTest {
new FlinkTypeFactory(
RexNodeJsonSerdeTest.class.getClassLoader(),
FlinkTypeSystem.INSTANCE);
private static final String FUNCTION_NAME = "MyFunc";
+ private static final String ASYNC_FUNCTION_NAME = "MyAsyncFunc";
private static final FunctionIdentifier FUNCTION_SYS_ID =
FunctionIdentifier.of(FUNCTION_NAME);
private static final FunctionIdentifier FUNCTION_CAT_ID =
FunctionIdentifier.of(
ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
FUNCTION_NAME));
+ private static final FunctionIdentifier ASYNC_FUNCTION_CAT_ID =
+ FunctionIdentifier.of(
+ ObjectIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE,
ASYNC_FUNCTION_NAME));
private static final UnresolvedIdentifier UNRESOLVED_FUNCTION_CAT_ID =
UnresolvedIdentifier.of(FUNCTION_CAT_ID.toList());
+ private static final UnresolvedIdentifier UNRESOLVED_ASYNC_FUNCTION_CAT_ID
=
+ UnresolvedIdentifier.of(ASYNC_FUNCTION_CAT_ID.toList());
private static final SerializableScalarFunction SER_UDF_IMPL = new
SerializableScalarFunction();
+ private static final SerializableAsyncScalarFunction SER_ASYNC_UDF_IMPL =
+ new SerializableAsyncScalarFunction();
private static final Class<SerializableScalarFunction> SER_UDF_CLASS =
SerializableScalarFunction.class;
+ private static final Class<SerializableAsyncScalarFunction>
SER_ASYNC_UDF_CLASS =
+ SerializableAsyncScalarFunction.class;
private static final OtherSerializableScalarFunction SER_UDF_IMPL_OTHER =
new OtherSerializableScalarFunction();
private static final Class<OtherSerializableScalarFunction>
SER_UDF_CLASS_OTHER =
@@ -140,6 +152,12 @@ public class RexNodeJsonSerdeTest {
createFunctionCall(serdeContext,
ContextResolvedFunction.anonymous(SER_UDF_IMPL)),
RexNode.class);
+ // Serializable async function
+ testJsonRoundTrip(
+ createFunctionCall(
+ serdeContext,
ContextResolvedFunction.anonymous(SER_ASYNC_UDF_IMPL)),
+ RexNode.class);
+
// Non-serializable function due to fields
assertThatThrownBy(
() ->
@@ -732,6 +750,11 @@ public class RexNodeJsonSerdeTest {
.getFlinkContext()
.getFunctionCatalog()
.registerCatalogFunction(UNRESOLVED_FUNCTION_CAT_ID,
SER_UDF_CLASS, false);
+ serdeContext
+ .getFlinkContext()
+ .getFunctionCatalog()
+ .registerCatalogFunction(
+ UNRESOLVED_ASYNC_FUNCTION_CAT_ID, SER_ASYNC_UDF_CLASS,
false);
return serdeContext;
}
@@ -834,6 +857,15 @@ public class RexNodeJsonSerdeTest {
}
}
+ /** Serializable async function. */
+ public static class SerializableAsyncScalarFunction extends
AsyncScalarFunction {
+
+ @SuppressWarnings("unused")
+ public void eval(CompletableFuture<String> res, Integer i) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
/** Non-serializable function. */
public static class NonSerializableScalarFunction extends ScalarFunction {
@SuppressWarnings({"FieldCanBeLocal", "unused"})