This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4264c30d367 [FLINK-34095] Adds restore tests for StreamExecAsyncCalc 
(#24220)
4264c30d367 is described below

commit 4264c30d367c1847d702ea5438da64876559a95d
Author: Alan Sheinberg <[email protected]>
AuthorDate: Tue Feb 6 09:53:36 2024 -0800

    [FLINK-34095] Adds restore tests for StreamExecAsyncCalc (#24220)
---
 .../converter/FunctionDefinitionConvertRule.java   |   1 +
 .../nodes/exec/stream/AsyncCalcRestoreTest.java    |  43 ++
 .../nodes/exec/stream/AsyncCalcTestPrograms.java   | 297 +++++++++++
 .../plan/async-calc-complex.json                   | 564 +++++++++++++++++++++
 .../async-calc-complex/savepoint/_metadata         | Bin 0 -> 28698 bytes
 .../plan/async-calc-condition.json                 | 186 +++++++
 .../async-calc-condition/savepoint/_metadata       | Bin 0 -> 7337 bytes
 .../plan/async-calc-failure-exception.json         | 104 ++++
 .../savepoint/_metadata                            | Bin 0 -> 6866 bytes
 .../async-calc-nested/plan/async-calc-nested.json  | 226 +++++++++
 .../async-calc-nested/savepoint/_metadata          | Bin 0 -> 11243 bytes
 .../async-calc-simple/plan/async-calc-simple.json  | 146 ++++++
 .../async-calc-simple/savepoint/_metadata          | Bin 0 -> 8428 bytes
 13 files changed, 1567 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
index ab7f34114e1..76407f8d778 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
@@ -56,6 +56,7 @@ public class FunctionDefinitionConvertRule implements 
CallExpressionConvertRule
 
         switch (definition.getKind()) {
             case SCALAR:
+            case ASYNC_SCALAR:
             case TABLE:
                 final List<RexNode> args =
                         call.getChildren().stream()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
new file mode 100644
index 00000000000..b7a6ef515a6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecAsyncCalc}. */
+public class AsyncCalcRestoreTest extends RestoreTestBase {
+
+    public AsyncCalcRestoreTest() {
+        super(StreamExecCalc.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                AsyncCalcTestPrograms.ASYNC_CALC_UDF_SIMPLE,
+                AsyncCalcTestPrograms.ASYNC_CALC_UDF_COMPLEX,
+                AsyncCalcTestPrograms.ASYNC_CALC_UDF_NESTED,
+                AsyncCalcTestPrograms.ASYNC_CALC_UDF_CONDITION,
+                AsyncCalcTestPrograms.ASYNC_CALC_UDF_FAILURE_EXCEPTION);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
new file mode 100644
index 00000000000..badbb4108e2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY;
+import static org.assertj.core.api.Assertions.fail;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecAsyncCalc}. */
+public class AsyncCalcTestPrograms {
+
+    static final TableTestProgram ASYNC_CALC_UDF_SIMPLE =
+            TableTestProgram.of("async-calc-simple", "validates async calc 
node with simple UDF")
+                    .setupTemporaryCatalogFunction("udf1", 
AsyncJavaFunc0.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .producedBeforeRestore(Row.of(5))
+                                    .producedAfterRestore(Row.of(5))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "a1 BIGINT")
+                                    .consumedBeforeRestore(Row.of(5, 6L))
+                                    .consumedAfterRestore(Row.of(5, 6L))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM 
source_t")
+                    .build();
+
+    static final TableTestProgram ASYNC_CALC_UDF_COMPLEX =
+            TableTestProgram.of("async-calc-complex", "validates calc node 
with complex UDFs")
+                    .setupTemporaryCatalogFunction("udf1", 
AsyncJavaFunc0.class)
+                    .setupTemporaryCatalogFunction("udf2", 
AsyncJavaFunc1.class)
+                    .setupTemporarySystemFunction("udf3", AsyncJavaFunc2.class)
+                    .setupTemporarySystemFunction("udf4", 
AsyncUdfWithOpen.class)
+                    .setupCatalogFunction("udf5", AsyncJavaFunc5.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "a BIGINT, b INT NOT NULL, c 
VARCHAR, d TIMESTAMP(3)")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    5L,
+                                                    11,
+                                                    "hello world",
+                                                    LocalDateTime.of(2023, 12, 
16, 1, 1, 1, 123)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    5L,
+                                                    11,
+                                                    "hello world",
+                                                    LocalDateTime.of(2023, 12, 
16, 1, 1, 1, 123)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "a BIGINT",
+                                            "a1 VARCHAR",
+                                            "b INT NOT NULL",
+                                            "b1 VARCHAR",
+                                            "c1 VARCHAR",
+                                            "c2 VARCHAR",
+                                            "d1 TIMESTAMP(3)")
+                                    .consumedBeforeRestore(
+                                            Row.of(
+                                                    5L,
+                                                    "5",
+                                                    11,
+                                                    "11 and 11 and 
1702688461000",
+                                                    "hello world11",
+                                                    "$hello",
+                                                    LocalDateTime.of(2023, 12, 
16, 01, 01, 00, 0)))
+                                    .consumedAfterRestore(
+                                            Row.of(
+                                                    5L,
+                                                    "5",
+                                                    11,
+                                                    "11 and 11 and 
1702688461000",
+                                                    "hello world11",
+                                                    "$hello",
+                                                    LocalDateTime.of(2023, 12, 
16, 01, 01, 00, 0)))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "a, "
+                                    + "cast(a as VARCHAR) as a1, "
+                                    + "b, "
+                                    + "udf2(b, b, d) as b1, "
+                                    + "udf3(c, b) as c1, "
+                                    + "udf4(substring(c, 1, 5)) as c2, "
+                                    + "udf5(d, 1000) as d1 "
+                                    + "from source_t where "
+                                    + "(udf1(a) > 0 or (a * b) < 100) and b > 
10")
+                    .build();
+
+    static final TableTestProgram ASYNC_CALC_UDF_NESTED =
+            TableTestProgram.of(
+                            "async-calc-nested",
+                            "validates async calc node when chained through 
nested calls")
+                    .setupTemporaryCatalogFunction("udf1", 
AsyncJavaFunc0.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .producedBeforeRestore(Row.of(5))
+                                    .producedAfterRestore(Row.of(5))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "a1 BIGINT")
+                                    .consumedBeforeRestore(Row.of(5, 8L))
+                                    .consumedAfterRestore(Row.of(5, 8L))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, udf1(udf1(udf1(a))) 
FROM source_t")
+                    .build();
+
+    static final TableTestProgram ASYNC_CALC_UDF_CONDITION =
+            TableTestProgram.of(
+                            "async-calc-condition",
+                            "validates async calc node with the udf written in 
the condition of the SQL query")
+                    .setupTemporaryCatalogFunction("udf1", 
AsyncJavaFunc0.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .producedBeforeRestore(Row.of(5), 
Row.of(6), Row.of(4))
+                                    .producedAfterRestore(Row.of(7), Row.of(3))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT")
+                                    .consumedBeforeRestore(Row.of(6))
+                                    .consumedAfterRestore(Row.of(7))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a FROM source_t WHERE 
udf1(a) > 6")
+                    .build();
+
+    static final TableTestProgram ASYNC_CALC_UDF_FAILURE_EXCEPTION =
+            TableTestProgram.of(
+                            "async-calc-failure-exception",
+                            "validates async calc node that fails some number 
of times and then recovers after restore")
+                    .setupConfig(TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY, 
Duration.ofMillis(3000))
+                    .setupConfig(TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS, 3)
+                    .setupConfig(TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, 5)
+                    .setupTemporaryCatalogFunction("udf1", 
TwosFailFunction.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT")
+                                    .producedBeforeRestore(Row.of(1), 
Row.of(2))
+                                    .producedAfterRestore(Row.of(3))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT")
+                                    .consumedBeforeRestore(Row.of(1))
+                                    .consumedAfterRestore(Row.of(2), Row.of(3))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT udf1(a) FROM source_t")
+                    .build();
+
+    /** Increment input. */
+    public static class AsyncJavaFunc0 extends AsyncScalarFunction {
+        public void eval(CompletableFuture<Long> future, Long l) {
+            future.complete(l + 1);
+        }
+    }
+
+    /** Concatenate inputs as strings. */
+    public static class AsyncJavaFunc1 extends AsyncScalarFunction {
+        public void eval(
+                CompletableFuture<String> future,
+                Integer a,
+                int b,
+                @DataTypeHint("TIMESTAMP(3)") TimestampData c) {
+            Long ts = (c == null) ? null : c.getMillisecond();
+            future.complete(a + " and " + b + " and " + ts);
+        }
+    }
+
+    /** Append product to string. */
+    public static class AsyncJavaFunc2 extends AsyncScalarFunction {
+        public void eval(CompletableFuture<String> future, String s, 
Integer... a) {
+            int m = 1;
+            for (int n : a) {
+                m *= n;
+            }
+            future.complete(s + m);
+        }
+    }
+
+    /**
+     * A UDF minus Timestamp with the specified offset. This UDF also ensures 
open and close are
+     * called.
+     */
+    public static class AsyncJavaFunc5 extends AsyncScalarFunction {
+        // these fields must be reset to false at the beginning of tests,
+        // otherwise the static fields will be changed by several tests 
concurrently
+        public static boolean openCalled = false;
+        public static boolean closeCalled = false;
+
+        @Override
+        public void open(FunctionContext context) {
+            openCalled = true;
+        }
+
+        public void eval(
+                @DataTypeHint("TIMESTAMP(3)") CompletableFuture<Timestamp> 
future,
+                @DataTypeHint("TIMESTAMP(3)") TimestampData timestampData,
+                Integer offset) {
+            if (!openCalled) {
+                fail("Open was not called before run.");
+            }
+            if (timestampData == null || offset == null) {
+                future.complete(null);
+            } else {
+                long ts = timestampData.getMillisecond() - offset;
+                int tzOffset = TimeZone.getDefault().getOffset(ts);
+                future.complete(new Timestamp(ts - tzOffset));
+            }
+        }
+
+        @Override
+        public void close() {
+            closeCalled = true;
+        }
+    }
+
+    /** Testing open method is called. */
+    public static class AsyncUdfWithOpen extends AsyncScalarFunction {
+
+        // transient make this class serializable by class name
+        private transient boolean isOpened = false;
+
+        @Override
+        public void open(FunctionContext context) throws Exception {
+            super.open(context);
+            this.isOpened = true;
+        }
+
+        public void eval(CompletableFuture<String> future, String c) {
+            if (!isOpened) {
+                throw new IllegalStateException("Open method is not called!");
+            }
+            future.complete("$" + c);
+        }
+    }
+
+    /** Testing that failing requests can recover after restore. */
+    public static class TwosFailFunction extends AsyncScalarFunction {
+        private static final int TOTAL_FAILURES = 1;
+
+        private final AtomicInteger calls = new AtomicInteger(0);
+
+        public void eval(CompletableFuture<Integer> future, Integer c) {
+            if (c != 2) {
+                future.complete(c);
+                return;
+            }
+            if (calls.incrementAndGet() > TOTAL_FAILURES) {
+                future.complete(c);
+                return;
+            }
+            throw new RuntimeException("Failure " + calls.get());
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
new file mode 100644
index 00000000000..f55924010da
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
@@ -0,0 +1,564 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `d` TIMESTAMP(3), `c` 
VARCHAR(2147483647), `f0` BIGINT>",
+    "description" : "AsyncCalc(select=[a, b, d, c, udf1(a) AS f0])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$SUBSTRING$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 5,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$AND$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$OR$1",
+        "operands" : [ {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$>$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 4,
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 0,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        }, {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$<$1",
+          "operands" : [ {
+            "kind" : "CALL",
+            "syntax" : "BINARY",
+            "internalName" : "$*$1",
+            "operands" : [ {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 0,
+              "type" : "BIGINT"
+            }, {
+              "kind" : "INPUT_REF",
+              "inputIndex" : 1,
+              "type" : "INT NOT NULL"
+            } ],
+            "type" : "BIGINT"
+          }, {
+            "kind" : "LITERAL",
+            "value" : 100,
+            "type" : "INT NOT NULL"
+          } ],
+          "type" : "BOOLEAN"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "BINARY",
+        "internalName" : "$>$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 1,
+          "type" : "INT NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : 10,
+          "type" : "INT NOT NULL"
+        } ],
+        "type" : "BOOLEAN NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `d` TIMESTAMP(3), `c` 
VARCHAR(2147483647), `f0` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, d, c, SUBSTRING(c, 1, 5) AS f0], 
where=[(((f0 > 0) OR ((a * b) < 100)) AND (b > 10))])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "TIMESTAMP(3)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`f0` VARCHAR(2147483647), `d` TIMESTAMP(3), `f00` VARCHAR(2147483647)>",
+    "description" : "AsyncCalc(select=[a, b, c, f0, d, udf2(b, b, d) AS f00])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf3",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f00` 
VARCHAR(2147483647), `f0` VARCHAR(2147483647), `d` TIMESTAMP(3), `f01` 
VARCHAR(2147483647)>",
+    "description" : "AsyncCalc(select=[a, b, f00, f0, d, udf3(c, b) AS f01])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3)"
+    }, {
+      "kind" : "CALL",
+      "systemName" : "udf4",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f00` 
VARCHAR(2147483647), `f01` VARCHAR(2147483647), `d` TIMESTAMP(3), `f0` 
VARCHAR(2147483647)>",
+    "description" : "AsyncCalc(select=[a, b, f00, f01, d, udf4(f0) AS f0])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 4,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 1000,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0` 
VARCHAR(2147483647), `f1` VARCHAR(2147483647), `f2` VARCHAR(2147483647), `f3` 
TIMESTAMP(3)>",
+    "description" : "AsyncCalc(select=[a, b, f00 AS f0, f01 AS f1, f0 AS f2, 
udf5(d, 1000) AS f3])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "BIGINT"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b, 
f0 AS b1, f1 AS c1, f2 AS c2, f3 AS d1])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c1",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c2",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d1",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT 
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2` 
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, a1, b, b1, c1, c2, d1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
new file mode 100644
index 00000000000..0e75373edb4
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
new file mode 100644
index 00000000000..225a5805c29
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
@@ -0,0 +1,186 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 20,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 6,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Calc(select=[a], where=[(f0 > 6)])"
+  }, {
+    "id" : 24,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a])"
+  } ],
+  "edges" : [ {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
new file mode 100644
index 00000000000..853248dc450
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
new file mode 100644
index 00000000000..8d411c695b7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
@@ -0,0 +1,104 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 25,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 26,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "5",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "3 s",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "INT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "AsyncCalc(select=[udf1(a) AS EXPR$0])"
+  }, {
+    "id" : 27,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`EXPR$0` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[EXPR$0])"
+  } ],
+  "edges" : [ {
+    "source" : 25,
+    "target" : 26,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 26,
+    "target" : 27,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
new file mode 100644
index 00000000000..afdb709fc80
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
new file mode 100644
index 00000000000..7d1a7ab32f0
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
@@ -0,0 +1,226 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 14,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+    "description" : "AsyncCalc(select=[a, udf1(f0) AS EXPR$1])"
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "a1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
new file mode 100644
index 00000000000..c5db86ce208
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
new file mode 100644
index 00000000000..9cd7c8ae544
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
@@ -0,0 +1,146 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+    "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-async-calc_1",
+    "configuration" : {
+      "table.exec.async-scalar.buffer-capacity" : "10",
+      "table.exec.async-scalar.max-attempts" : "3",
+      "table.exec.async-scalar.retry-delay" : "100 ms",
+      "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+      "table.exec.async-scalar.timeout" : "3 min"
+    },
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "BIGINT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+    "description" : "AsyncCalc(select=[a, udf1(f0) AS EXPR$1])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "a1",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
new file mode 100644
index 00000000000..f7904734f67
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
 differ

Reply via email to