This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4264c30d367 [FLINK-34095] Adds restore tests for StreamExecAsyncCalc
(#24220)
4264c30d367 is described below
commit 4264c30d367c1847d702ea5438da64876559a95d
Author: Alan Sheinberg <[email protected]>
AuthorDate: Tue Feb 6 09:53:36 2024 -0800
[FLINK-34095] Adds restore tests for StreamExecAsyncCalc (#24220)
---
.../converter/FunctionDefinitionConvertRule.java | 1 +
.../nodes/exec/stream/AsyncCalcRestoreTest.java | 43 ++
.../nodes/exec/stream/AsyncCalcTestPrograms.java | 297 +++++++++++
.../plan/async-calc-complex.json | 564 +++++++++++++++++++++
.../async-calc-complex/savepoint/_metadata | Bin 0 -> 28698 bytes
.../plan/async-calc-condition.json | 186 +++++++
.../async-calc-condition/savepoint/_metadata | Bin 0 -> 7337 bytes
.../plan/async-calc-failure-exception.json | 104 ++++
.../savepoint/_metadata | Bin 0 -> 6866 bytes
.../async-calc-nested/plan/async-calc-nested.json | 226 +++++++++
.../async-calc-nested/savepoint/_metadata | Bin 0 -> 11243 bytes
.../async-calc-simple/plan/async-calc-simple.json | 146 ++++++
.../async-calc-simple/savepoint/_metadata | Bin 0 -> 8428 bytes
13 files changed, 1567 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
index ab7f34114e1..76407f8d778 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/FunctionDefinitionConvertRule.java
@@ -56,6 +56,7 @@ public class FunctionDefinitionConvertRule implements
CallExpressionConvertRule
switch (definition.getKind()) {
case SCALAR:
+ case ASYNC_SCALAR:
case TABLE:
final List<RexNode> args =
call.getChildren().stream()
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
new file mode 100644
index 00000000000..b7a6ef515a6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcRestoreTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecAsyncCalc}. */
+public class AsyncCalcRestoreTest extends RestoreTestBase {
+
+ public AsyncCalcRestoreTest() {
+ super(StreamExecCalc.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return Arrays.asList(
+ AsyncCalcTestPrograms.ASYNC_CALC_UDF_SIMPLE,
+ AsyncCalcTestPrograms.ASYNC_CALC_UDF_COMPLEX,
+ AsyncCalcTestPrograms.ASYNC_CALC_UDF_NESTED,
+ AsyncCalcTestPrograms.ASYNC_CALC_UDF_CONDITION,
+ AsyncCalcTestPrograms.ASYNC_CALC_UDF_FAILURE_EXCEPTION);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
new file mode 100644
index 00000000000..badbb4108e2
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/AsyncCalcTestPrograms.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.AsyncScalarFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS;
+import static
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY;
+import static org.assertj.core.api.Assertions.fail;
+
+/** {@link TableTestProgram} definitions for testing {@link
StreamExecAsyncCalc}. */
+public class AsyncCalcTestPrograms {
+
+ static final TableTestProgram ASYNC_CALC_UDF_SIMPLE =
+ TableTestProgram.of("async-calc-simple", "validates async calc
node with simple UDF")
+ .setupTemporaryCatalogFunction("udf1",
AsyncJavaFunc0.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .producedBeforeRestore(Row.of(5))
+ .producedAfterRestore(Row.of(5))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT", "a1 BIGINT")
+ .consumedBeforeRestore(Row.of(5, 6L))
+ .consumedAfterRestore(Row.of(5, 6L))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT a, udf1(a) FROM
source_t")
+ .build();
+
+ static final TableTestProgram ASYNC_CALC_UDF_COMPLEX =
+ TableTestProgram.of("async-calc-complex", "validates calc node
with complex UDFs")
+ .setupTemporaryCatalogFunction("udf1",
AsyncJavaFunc0.class)
+ .setupTemporaryCatalogFunction("udf2",
AsyncJavaFunc1.class)
+ .setupTemporarySystemFunction("udf3", AsyncJavaFunc2.class)
+ .setupTemporarySystemFunction("udf4",
AsyncUdfWithOpen.class)
+ .setupCatalogFunction("udf5", AsyncJavaFunc5.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(
+ "a BIGINT, b INT NOT NULL, c
VARCHAR, d TIMESTAMP(3)")
+ .producedBeforeRestore(
+ Row.of(
+ 5L,
+ 11,
+ "hello world",
+ LocalDateTime.of(2023, 12,
16, 1, 1, 1, 123)))
+ .producedAfterRestore(
+ Row.of(
+ 5L,
+ 11,
+ "hello world",
+ LocalDateTime.of(2023, 12,
16, 1, 1, 1, 123)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "a BIGINT",
+ "a1 VARCHAR",
+ "b INT NOT NULL",
+ "b1 VARCHAR",
+ "c1 VARCHAR",
+ "c2 VARCHAR",
+ "d1 TIMESTAMP(3)")
+ .consumedBeforeRestore(
+ Row.of(
+ 5L,
+ "5",
+ 11,
+ "11 and 11 and
1702688461000",
+ "hello world11",
+ "$hello",
+ LocalDateTime.of(2023, 12,
16, 01, 01, 00, 0)))
+ .consumedAfterRestore(
+ Row.of(
+ 5L,
+ "5",
+ 11,
+ "11 and 11 and
1702688461000",
+ "hello world11",
+ "$hello",
+ LocalDateTime.of(2023, 12,
16, 01, 01, 00, 0)))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT "
+ + "a, "
+ + "cast(a as VARCHAR) as a1, "
+ + "b, "
+ + "udf2(b, b, d) as b1, "
+ + "udf3(c, b) as c1, "
+ + "udf4(substring(c, 1, 5)) as c2, "
+ + "udf5(d, 1000) as d1 "
+ + "from source_t where "
+ + "(udf1(a) > 0 or (a * b) < 100) and b >
10")
+ .build();
+
+ static final TableTestProgram ASYNC_CALC_UDF_NESTED =
+ TableTestProgram.of(
+ "async-calc-nested",
+ "validates async calc node when chained through
nested calls")
+ .setupTemporaryCatalogFunction("udf1",
AsyncJavaFunc0.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .producedBeforeRestore(Row.of(5))
+ .producedAfterRestore(Row.of(5))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT", "a1 BIGINT")
+ .consumedBeforeRestore(Row.of(5, 8L))
+ .consumedAfterRestore(Row.of(5, 8L))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT a, udf1(udf1(udf1(a)))
FROM source_t")
+ .build();
+
+ static final TableTestProgram ASYNC_CALC_UDF_CONDITION =
+ TableTestProgram.of(
+ "async-calc-condition",
+ "validates async calc node with the udf written in
the condition of the SQL query")
+ .setupTemporaryCatalogFunction("udf1",
AsyncJavaFunc0.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .producedBeforeRestore(Row.of(5),
Row.of(6), Row.of(4))
+ .producedAfterRestore(Row.of(7), Row.of(3))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT")
+ .consumedBeforeRestore(Row.of(6))
+ .consumedAfterRestore(Row.of(7))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT a FROM source_t WHERE
udf1(a) > 6")
+ .build();
+
+ static final TableTestProgram ASYNC_CALC_UDF_FAILURE_EXCEPTION =
+ TableTestProgram.of(
+ "async-calc-failure-exception",
+ "validates async calc node that fails some number
of times and then recovers after restore")
+ .setupConfig(TABLE_EXEC_ASYNC_SCALAR_RETRY_DELAY,
Duration.ofMillis(3000))
+ .setupConfig(TABLE_EXEC_ASYNC_SCALAR_MAX_ATTEMPTS, 3)
+ .setupConfig(TABLE_EXEC_ASYNC_SCALAR_BUFFER_CAPACITY, 5)
+ .setupTemporaryCatalogFunction("udf1",
TwosFailFunction.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT")
+ .producedBeforeRestore(Row.of(1),
Row.of(2))
+ .producedAfterRestore(Row.of(3))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("a INT")
+ .consumedBeforeRestore(Row.of(1))
+ .consumedAfterRestore(Row.of(2), Row.of(3))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT udf1(a) FROM source_t")
+ .build();
+
+ /** Increment input. */
+ public static class AsyncJavaFunc0 extends AsyncScalarFunction {
+ public void eval(CompletableFuture<Long> future, Long l) {
+ future.complete(l + 1);
+ }
+ }
+
+ /** Concatenate inputs as strings. */
+ public static class AsyncJavaFunc1 extends AsyncScalarFunction {
+ public void eval(
+ CompletableFuture<String> future,
+ Integer a,
+ int b,
+ @DataTypeHint("TIMESTAMP(3)") TimestampData c) {
+ Long ts = (c == null) ? null : c.getMillisecond();
+ future.complete(a + " and " + b + " and " + ts);
+ }
+ }
+
+ /** Append product to string. */
+ public static class AsyncJavaFunc2 extends AsyncScalarFunction {
+ public void eval(CompletableFuture<String> future, String s,
Integer... a) {
+ int m = 1;
+ for (int n : a) {
+ m *= n;
+ }
+ future.complete(s + m);
+ }
+ }
+
+ /**
+ * A UDF minus Timestamp with the specified offset. This UDF also ensures
open and close are
+ * called.
+ */
+ public static class AsyncJavaFunc5 extends AsyncScalarFunction {
+ // these fields must be reset to false at the beginning of tests,
+ // otherwise the static fields will be changed by several tests
concurrently
+ public static boolean openCalled = false;
+ public static boolean closeCalled = false;
+
+ @Override
+ public void open(FunctionContext context) {
+ openCalled = true;
+ }
+
+ public void eval(
+ @DataTypeHint("TIMESTAMP(3)") CompletableFuture<Timestamp>
future,
+ @DataTypeHint("TIMESTAMP(3)") TimestampData timestampData,
+ Integer offset) {
+ if (!openCalled) {
+ fail("Open was not called before run.");
+ }
+ if (timestampData == null || offset == null) {
+ future.complete(null);
+ } else {
+ long ts = timestampData.getMillisecond() - offset;
+ int tzOffset = TimeZone.getDefault().getOffset(ts);
+ future.complete(new Timestamp(ts - tzOffset));
+ }
+ }
+
+ @Override
+ public void close() {
+ closeCalled = true;
+ }
+ }
+
+ /** Testing open method is called. */
+ public static class AsyncUdfWithOpen extends AsyncScalarFunction {
+
+ // transient make this class serializable by class name
+ private transient boolean isOpened = false;
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ super.open(context);
+ this.isOpened = true;
+ }
+
+ public void eval(CompletableFuture<String> future, String c) {
+ if (!isOpened) {
+ throw new IllegalStateException("Open method is not called!");
+ }
+ future.complete("$" + c);
+ }
+ }
+
+ /** Testing that failing requests can recover after restore. */
+ public static class TwosFailFunction extends AsyncScalarFunction {
+ private static final int TOTAL_FAILURES = 1;
+
+ private final AtomicInteger calls = new AtomicInteger(0);
+
+ public void eval(CompletableFuture<Integer> future, Integer c) {
+ if (c != 2) {
+ future.complete(c);
+ return;
+ }
+ if (calls.incrementAndGet() > TOTAL_FAILURES) {
+ future.complete(c);
+ return;
+ }
+ throw new RuntimeException("Failure " + calls.get());
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
new file mode 100644
index 00000000000..f55924010da
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/plan/async-calc-complex.json
@@ -0,0 +1,564 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`d` TIMESTAMP(3)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c, d])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `d` TIMESTAMP(3), `c`
VARCHAR(2147483647), `f0` BIGINT>",
+ "description" : "AsyncCalc(select=[a, b, d, c, udf1(a) AS f0])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$SUBSTRING$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 5,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$AND$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 0,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$<$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$*$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 100,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 10,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `d` TIMESTAMP(3), `c`
VARCHAR(2147483647), `f0` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, b, d, c, SUBSTRING(c, 1, 5) AS f0],
where=[(((f0 > 0) OR ((a * b) < 100)) AND (b > 10))])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf2`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`f0` VARCHAR(2147483647), `d` TIMESTAMP(3), `f00` VARCHAR(2147483647)>",
+ "description" : "AsyncCalc(select=[a, b, c, f0, d, udf2(b, b, d) AS f00])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf3",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f00`
VARCHAR(2147483647), `f0` VARCHAR(2147483647), `d` TIMESTAMP(3), `f01`
VARCHAR(2147483647)>",
+ "description" : "AsyncCalc(select=[a, b, f00, f0, d, udf3(c, b) AS f01])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "CALL",
+ "systemName" : "udf4",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f00`
VARCHAR(2147483647), `f01` VARCHAR(2147483647), `d` TIMESTAMP(3), `f0`
VARCHAR(2147483647)>",
+ "description" : "AsyncCalc(select=[a, b, f00, f01, d, udf4(f0) AS f0])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf5`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 1000,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `f0`
VARCHAR(2147483647), `f1` VARCHAR(2147483647), `f2` VARCHAR(2147483647), `f3`
TIMESTAMP(3)>",
+ "description" : "AsyncCalc(select=[a, b, f00 AS f0, f01 AS f1, f0 AS f2,
udf5(d, 1000) AS f3])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Calc(select=[a, CAST(a AS VARCHAR(2147483647)) AS a1, b,
f0 AS b1, f1 AS c1, f2 AS c2, f3 AS d1])"
+ }, {
+ "id" : 9,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "b1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c1",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c2",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d1",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` VARCHAR(2147483647), `b` INT NOT
NULL, `b1` VARCHAR(2147483647), `c1` VARCHAR(2147483647), `c2`
VARCHAR(2147483647), `d1` TIMESTAMP(3)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, a1, b, b1, c1, c2, d1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
new file mode 100644
index 00000000000..0e75373edb4
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-complex/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
new file mode 100644
index 00000000000..225a5805c29
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/plan/async-calc-condition.json
@@ -0,0 +1,186 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 20,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 21,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+ }, {
+ "id" : 22,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+ }, {
+ "id" : 23,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$>$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "LITERAL",
+ "value" : 6,
+ "type" : "INT NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Calc(select=[a], where=[(f0 > 6)])"
+ }, {
+ "id" : 24,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a])"
+ } ],
+ "edges" : [ {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 21,
+ "target" : 22,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 23,
+ "target" : 24,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
new file mode 100644
index 00000000000..853248dc450
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-condition/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
new file mode 100644
index 00000000000..8d411c695b7
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/plan/async-calc-failure-exception.json
@@ -0,0 +1,104 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 25,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 26,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "5",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "3 s",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "INT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "AsyncCalc(select=[udf1(a) AS EXPR$0])"
+ }, {
+ "id" : 27,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`EXPR$0` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[EXPR$0])"
+ } ],
+ "edges" : [ {
+ "source" : 25,
+ "target" : 26,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 26,
+ "target" : 27,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
new file mode 100644
index 00000000000..afdb709fc80
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-failure-exception/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
new file mode 100644
index 00000000000..7d1a7ab32f0
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/plan/async-calc-nested.json
@@ -0,0 +1,226 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 14,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 15,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+ }, {
+ "id" : 16,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+ }, {
+ "id" : 17,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "AsyncCalc(select=[a, udf1(f0) AS f0])"
+ }, {
+ "id" : 18,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+ "description" : "AsyncCalc(select=[a, udf1(f0) AS EXPR$1])"
+ }, {
+ "id" : 19,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "a1",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 17,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 17,
+ "target" : 18,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 18,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
new file mode 100644
index 00000000000..c5db86ce208
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-nested/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
new file mode 100644
index 00000000000..9cd7c8ae544
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/plan/async-calc-simple.json
@@ -0,0 +1,146 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `f0` BIGINT>",
+ "description" : "Calc(select=[a, CAST(a AS BIGINT) AS f0])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-async-calc_1",
+ "configuration" : {
+ "table.exec.async-scalar.buffer-capacity" : "10",
+ "table.exec.async-scalar.max-attempts" : "3",
+ "table.exec.async-scalar.retry-delay" : "100 ms",
+ "table.exec.async-scalar.retry-strategy" : "FIXED_DELAY",
+ "table.exec.async-scalar.timeout" : "3 min"
+ },
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`udf1`",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "BIGINT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+ "description" : "AsyncCalc(select=[a, udf1(f0) AS EXPR$1])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "a1",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, EXPR$1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
new file mode 100644
index 00000000000..f7904734f67
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-calc_1/async-calc-simple/savepoint/_metadata
differ