This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 881062f352f [FLINK-34005] Implement restore tests for 
MiniBatchAssigner node
881062f352f is described below

commit 881062f352f8bf8c21ab7cbea95e111fd82fdf20
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Fri Jan 5 11:46:26 2024 -0800

    [FLINK-34005] Implement restore tests for MiniBatchAssigner node
---
 .../exec/stream/MiniBatchAssignerRestoreTest.java  |  40 +
 .../exec/stream/MiniBatchAssignerTestPrograms.java | 144 ++++
 .../plan/mini-batch-assigner-proc-time.json        | 257 +++++++
 .../savepoint/_metadata                            | Bin 0 -> 13431 bytes
 .../plan/mini-batch-assigner-row-time.json         | 854 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 24113 bytes
 6 files changed, 1295 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
new file mode 100644
index 00000000000..213c96fd6eb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecMiniBatchAssigner}. */
+public class MiniBatchAssignerRestoreTest extends RestoreTestBase {
+
+    public MiniBatchAssignerRestoreTest() {
+        super(StreamExecMiniBatchAssigner.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_ROW_TIME,
+                MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_PROC_TIME);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
new file mode 100644
index 00000000000..c2c701f6d1c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecMiniBatchAssigner}. */
+public class MiniBatchAssignerTestPrograms {
+
+    static final String[] ROW_TIME_SCHEMA = {
+        "ts STRING",
+        "id STRING",
+        "num INT",
+        "name STRING",
+        "row_time AS TO_TIMESTAMP(`ts`)",
+        "WATERMARK for `row_time` AS `row_time` - INTERVAL '1' SECOND"
+    };
+
+    static final TableTestProgram MINI_BATCH_ASSIGNER_ROW_TIME =
+            TableTestProgram.of(
+                            "mini-batch-assigner-row-time",
+                            "validates mini batch assigner with row time")
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(1))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_one_t")
+                                    .addSchema(ROW_TIME_SCHEMA)
+                                    .producedBeforeRestore(
+                                            Row.of("2020-10-10 00:00:01", 
"L1", 1, "a"),
+                                            Row.of("2020-10-10 00:00:02", 
"L2", 2, "c"),
+                                            Row.of("2020-10-10 00:00:03", 
"L3", 2, "x"))
+                                    .producedAfterRestore(
+                                            Row.of("2020-10-10 00:00:41", 
"L41", 10, "a"),
+                                            Row.of("2020-10-10 00:00:42", 
"L42", 11, "c"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_two_t")
+                                    .addSchema(ROW_TIME_SCHEMA)
+                                    .producedBeforeRestore(
+                                            Row.of("2020-10-10 00:00:01", 
"R1", 5, "a"),
+                                            Row.of("2020-10-10 00:00:02", 
"R2", 7, "b"),
+                                            Row.of("2020-10-10 00:00:03", 
"R3", 7, "f"))
+                                    .producedAfterRestore(
+                                            Row.of("2020-10-10 00:00:41", 
"R41", 10, "y"),
+                                            Row.of("2020-10-10 00:00:42", 
"R42", 11, "c"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "window_start TIMESTAMP(3)",
+                                            "window_end TIMESTAMP(3)",
+                                            "name STRING",
+                                            "L_id STRING",
+                                            "L_num INT",
+                                            "R_id STRING",
+                                            "R_num INT")
+                                    .consumedBeforeRestore(
+                                            "+I[2020-10-10T00:00:01, 
2020-10-10T00:00:02, a, L1, 1, R1, 5]")
+                                    .consumedAfterRestore(
+                                            "+I[2020-10-10T00:00:42, 
2020-10-10T00:00:43, c, L42, 11, R42, 11]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT\n"
+                                    + "L.window_start AS window_start,\n"
+                                    + "L.window_end AS window_end,\n"
+                                    + "L.name AS name,\n"
+                                    + "L.id AS L_id,\n"
+                                    + "L.num AS L_num,\n"
+                                    + "R.id AS R_id,\n"
+                                    + "R.num AS R_num\n"
+                                    + "FROM\n"
+                                    + "(\n"
+                                    + "    SELECT * FROM TABLE(TUMBLE(TABLE 
source_one_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n"
+                                    + ") L\n"
+                                    + "JOIN\n"
+                                    + "(\n"
+                                    + "    SELECT * FROM TABLE(TUMBLE(TABLE 
source_two_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n"
+                                    + ") R\n"
+                                    + "ON L.name = R.name\n"
+                                    + "AND L.window_start = R.window_start\n"
+                                    + "AND L.window_end = R.window_end")
+                    .build();
+
+    static final TableTestProgram MINI_BATCH_ASSIGNER_PROC_TIME =
+            TableTestProgram.of(
+                            "mini-batch-assigner-proc-time",
+                            "validates mini batch assigner with proc time")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema("a INT", "b BIGINT", "c 
VARCHAR")
+                                    .producedBeforeRestore(
+                                            Row.of(1, 1L, "hi"),
+                                            Row.of(2, 2L, "hello"),
+                                            Row.of(3, 2L, "hello world"))
+                                    .producedAfterRestore(
+                                            Row.of(3, 2L, "foo"),
+                                            Row.of(4, 4L, "bar"),
+                                            Row.of(5, 2L, "foo bar"))
+                                    .build())
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(1))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("b BIGINT", "a BIGINT")
+                                    .consumedBeforeRestore("+I[1, 1]", "+I[2, 
2]")
+                                    .consumedAfterRestore("-U[2, 2]", "+U[2, 
3]", "+I[4, 1]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t\n"
+                                    + "     SELECT\n"
+                                    + "         b,\n"
+                                    + "         COUNT(DISTINCT a) AS a\n"
+                                    + "     FROM source_t\n"
+                                    + "     GROUP BY b")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
new file mode 100644
index 00000000000..8210b80a0b2
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
@@ -0,0 +1,257 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 18,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 1 ], [ 0 ] ],
+        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`b` BIGINT, `a` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 19,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 1000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` INT>",
+    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-local-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "a",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "needRetraction" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "LocalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])"
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "b",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "count$0",
+        "fieldType" : "BIGINT"
+      }, {
+        "name" : "distinct$0",
+        "fieldType" : {
+          "type" : "RAW",
+          "class" : "org.apache.flink.table.api.dataview.MapView",
+          "externalDataType" : {
+            "type" : "STRUCTURED_TYPE",
+            "implementationClass" : 
"org.apache.flink.table.api.dataview.MapView",
+            "attributes" : [ {
+              "name" : "map",
+              "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+            } ]
+          }
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[b]])"
+  }, {
+    "id" : 22,
+    "type" : "stream-exec-global-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "5"
+    },
+    "grouping" : [ 0 ],
+    "aggCalls" : [ {
+      "name" : "a",
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
+      "argList" : [ 1 ],
+      "filterArg" : -1,
+      "distinct" : true,
+      "approximate" : false,
+      "ignoreNulls" : false,
+      "type" : "BIGINT NOT NULL"
+    } ],
+    "aggCallNeedRetractions" : [ false ],
+    "localAggInputRowType" : "ROW<`b` BIGINT, `a` INT>",
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "globalGroupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>",
+    "description" : "GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 count$0) AS a])"
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[b, a])"
+  } ],
+  "edges" : [ {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 21,
+    "target" : 22,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
new file mode 100644
index 00000000000..1afe7af6919
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
new file mode 100644
index 00000000000..7870d46a833
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
@@ -0,0 +1,854 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_one_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "id",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "num",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "row_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "row_time",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 4,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`row_time` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647), 
`num` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_one_t]], fields=[ts, id, num, name])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `row_time` TIMESTAMP(3)>",
+    "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS 
row_time])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[row_time], 
watermark=[(row_time - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 1000,
+      "mode" : "RowTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-window-table-function_1",
+    "configuration" : {
+      "table.local-time-zone" : "default"
+    },
+    "windowing" : {
+      "strategy" : "TimeAttribute",
+      "window" : {
+        "type" : "TumblingWindow",
+        "size" : "PT1S"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "timeAttributeIndex" : 3,
+      "isRowtime" : true
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "window_start",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "window_end",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "window_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time], 
size=[1 s])])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Calc(select=[id, num, name, window_start, window_end])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 2 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_two_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "ts",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "id",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "num",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "row_time",
+              "kind" : "COMPUTED",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "internalName" : "$TO_TIMESTAMP$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 0,
+                    "type" : "VARCHAR(2147483647)"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "TO_TIMESTAMP(`ts`)"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "row_time",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "SPECIAL",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 4,
+                    "type" : "TIMESTAMP(3)"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1000",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3)"
+                },
+                "serializableString" : "`row_time` - INTERVAL '1' SECOND"
+              }
+            } ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647), 
`num` INT, `name` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_two_t]], fields=[ts, id, num, name])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "internalName" : "$TO_TIMESTAMP$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `row_time` TIMESTAMP(3)>",
+    "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS 
row_time])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-watermark-assigner_1",
+    "watermarkExpr" : {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$-$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 3,
+        "type" : "TIMESTAMP(3)"
+      }, {
+        "kind" : "LITERAL",
+        "value" : "1000",
+        "type" : "INTERVAL SECOND(6) NOT NULL"
+      } ],
+      "type" : "TIMESTAMP(3)"
+    },
+    "rowtimeFieldIndex" : 3,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WatermarkAssigner(rowtime=[row_time], 
watermark=[(row_time - 1000:INTERVAL SECOND)])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 1000,
+      "mode" : "RowTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])"
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-window-table-function_1",
+    "configuration" : {
+      "table.local-time-zone" : "default"
+    },
+    "windowing" : {
+      "strategy" : "TimeAttribute",
+      "window" : {
+        "type" : "TumblingWindow",
+        "size" : "PT1S"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "timeAttributeIndex" : 3,
+      "isRowtime" : true
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "id",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "num",
+        "fieldType" : "INT"
+      }, {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "row_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      }, {
+        "name" : "window_start",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "window_end",
+        "fieldType" : "TIMESTAMP(3) NOT NULL"
+      }, {
+        "name" : "window_time",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time], 
size=[1 s])])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Calc(select=[id, num, name, window_start, window_end])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 2 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-window-join_1",
+    "configuration" : {
+      "table.local-time-zone" : "default"
+    },
+    "joinSpec" : {
+      "joinType" : "INNER",
+      "leftKeys" : [ 2 ],
+      "rightKeys" : [ 2 ],
+      "filterNulls" : [ true ],
+      "nonEquiCondition" : null
+    },
+    "leftWindowing" : {
+      "strategy" : "WindowAttached",
+      "window" : {
+        "type" : "TumblingWindow",
+        "size" : "PT1S"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "windowStart" : 3,
+      "windowEnd" : 4,
+      "isRowtime" : true
+    },
+    "rightWindowing" : {
+      "strategy" : "WindowAttached",
+      "window" : {
+        "type" : "TumblingWindow",
+        "size" : "PT1S"
+      },
+      "timeAttributeType" : {
+        "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+        "precision" : 3,
+        "kind" : "ROWTIME"
+      },
+      "windowStart" : 3,
+      "windowEnd" : 4,
+      "isRowtime" : true
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name` 
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL, `id0` VARCHAR(2147483647), `num0` INT, `name0` 
VARCHAR(2147483647), `window_start0` TIMESTAMP(3) NOT NULL, `window_end0` 
TIMESTAMP(3) NOT NULL>",
+    "description" : "WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 s])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 
s])], joinType=[InnerJoin], where=[(name = name0)], select=[id, num, name, 
window_start, window_end, id0, num0, name0, window_start0, window_end0])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 4,
+      "type" : "TIMESTAMP(3) NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 5,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 6,
+      "type" : "INT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647), 
`L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>",
+    "description" : "Calc(select=[window_start, window_end, name, id AS L_id, 
num AS L_num, id0 AS R_id, num0 AS R_num])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "window_start",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "window_end",
+              "dataType" : "TIMESTAMP(3)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "L_id",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "L_num",
+              "dataType" : "INT"
+            }, {
+              "name" : "R_id",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "R_num",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end` 
TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647), 
`L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[window_start, window_end, name, L_id, L_num, R_id, R_num])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
new file mode 100644
index 00000000000..804773b2a4a
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
 differ


Reply via email to