This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 881062f352f [FLINK-34005] Implement restore tests for
MiniBatchAssigner node
881062f352f is described below
commit 881062f352f8bf8c21ab7cbea95e111fd82fdf20
Author: bvarghese1 <[email protected]>
AuthorDate: Fri Jan 5 11:46:26 2024 -0800
[FLINK-34005] Implement restore tests for MiniBatchAssigner node
---
.../exec/stream/MiniBatchAssignerRestoreTest.java | 40 +
.../exec/stream/MiniBatchAssignerTestPrograms.java | 144 ++++
.../plan/mini-batch-assigner-proc-time.json | 257 +++++++
.../savepoint/_metadata | Bin 0 -> 13431 bytes
.../plan/mini-batch-assigner-row-time.json | 854 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 24113 bytes
6 files changed, 1295 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
new file mode 100644
index 00000000000..213c96fd6eb
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerRestoreTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecMiniBatchAssigner}. */
+public class MiniBatchAssignerRestoreTest extends RestoreTestBase {
+
+ public MiniBatchAssignerRestoreTest() {
+ super(StreamExecMiniBatchAssigner.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return Arrays.asList(
+ MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_ROW_TIME,
+ MiniBatchAssignerTestPrograms.MINI_BATCH_ASSIGNER_PROC_TIME);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
new file mode 100644
index 00000000000..c2c701f6d1c
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiniBatchAssignerTestPrograms.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link
StreamExecMiniBatchAssigner}. */
+public class MiniBatchAssignerTestPrograms {
+
+ static final String[] ROW_TIME_SCHEMA = {
+ "ts STRING",
+ "id STRING",
+ "num INT",
+ "name STRING",
+ "row_time AS TO_TIMESTAMP(`ts`)",
+ "WATERMARK for `row_time` AS `row_time` - INTERVAL '1' SECOND"
+ };
+
+ static final TableTestProgram MINI_BATCH_ASSIGNER_ROW_TIME =
+ TableTestProgram.of(
+ "mini-batch-assigner-row-time",
+ "validates mini batch assigner with row time")
+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+ .setupConfig(
+
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+ Duration.ofSeconds(1))
+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_one_t")
+ .addSchema(ROW_TIME_SCHEMA)
+ .producedBeforeRestore(
+ Row.of("2020-10-10 00:00:01",
"L1", 1, "a"),
+ Row.of("2020-10-10 00:00:02",
"L2", 2, "c"),
+ Row.of("2020-10-10 00:00:03",
"L3", 2, "x"))
+ .producedAfterRestore(
+ Row.of("2020-10-10 00:00:41",
"L41", 10, "a"),
+ Row.of("2020-10-10 00:00:42",
"L42", 11, "c"))
+ .build())
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_two_t")
+ .addSchema(ROW_TIME_SCHEMA)
+ .producedBeforeRestore(
+ Row.of("2020-10-10 00:00:01",
"R1", 5, "a"),
+ Row.of("2020-10-10 00:00:02",
"R2", 7, "b"),
+ Row.of("2020-10-10 00:00:03",
"R3", 7, "f"))
+ .producedAfterRestore(
+ Row.of("2020-10-10 00:00:41",
"R41", 10, "y"),
+ Row.of("2020-10-10 00:00:42",
"R42", 11, "c"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "window_start TIMESTAMP(3)",
+ "window_end TIMESTAMP(3)",
+ "name STRING",
+ "L_id STRING",
+ "L_num INT",
+ "R_id STRING",
+ "R_num INT")
+ .consumedBeforeRestore(
+ "+I[2020-10-10T00:00:01,
2020-10-10T00:00:02, a, L1, 1, R1, 5]")
+ .consumedAfterRestore(
+ "+I[2020-10-10T00:00:42,
2020-10-10T00:00:43, c, L42, 11, R42, 11]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT\n"
+ + "L.window_start AS window_start,\n"
+ + "L.window_end AS window_end,\n"
+ + "L.name AS name,\n"
+ + "L.id AS L_id,\n"
+ + "L.num AS L_num,\n"
+ + "R.id AS R_id,\n"
+ + "R.num AS R_num\n"
+ + "FROM\n"
+ + "(\n"
+ + " SELECT * FROM TABLE(TUMBLE(TABLE
source_one_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n"
+ + ") L\n"
+ + "JOIN\n"
+ + "(\n"
+ + " SELECT * FROM TABLE(TUMBLE(TABLE
source_two_t, DESCRIPTOR(row_time), INTERVAL '1' SECOND))\n"
+ + ") R\n"
+ + "ON L.name = R.name\n"
+ + "AND L.window_start = R.window_start\n"
+ + "AND L.window_end = R.window_end")
+ .build();
+
+ static final TableTestProgram MINI_BATCH_ASSIGNER_PROC_TIME =
+ TableTestProgram.of(
+ "mini-batch-assigner-proc-time",
+ "validates mini batch assigner with proc time")
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema("a INT", "b BIGINT", "c
VARCHAR")
+ .producedBeforeRestore(
+ Row.of(1, 1L, "hi"),
+ Row.of(2, 2L, "hello"),
+ Row.of(3, 2L, "hello world"))
+ .producedAfterRestore(
+ Row.of(3, 2L, "foo"),
+ Row.of(4, 4L, "bar"),
+ Row.of(5, 2L, "foo bar"))
+ .build())
+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+ .setupConfig(
+
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+ Duration.ofSeconds(1))
+
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5L)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("b BIGINT", "a BIGINT")
+ .consumedBeforeRestore("+I[1, 1]", "+I[2,
2]")
+ .consumedAfterRestore("-U[2, 2]", "+U[2,
3]", "+I[4, 1]")
+ .build())
+ .runSql(
+ "INSERT INTO sink_t\n"
+ + " SELECT\n"
+ + " b,\n"
+ + " COUNT(DISTINCT a) AS a\n"
+ + " FROM source_t\n"
+ + " GROUP BY b")
+ .build();
+}
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
new file mode 100644
index 00000000000..8210b80a0b2
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/plan/mini-batch-assigner-proc-time.json
@@ -0,0 +1,257 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 18,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 1 ], [ 0 ] ],
+ "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`b` BIGINT, `a` INT> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`b` BIGINT, `a` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, project=[b, a], metadata=[]]], fields=[b, a])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 19,
+ "type" : "stream-exec-mini-batch-assigner_1",
+ "miniBatchInterval" : {
+ "interval" : 1000,
+ "mode" : "ProcTime"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` INT>",
+ "description" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])"
+ }, {
+ "id" : 20,
+ "type" : "stream-exec-local-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "true",
+ "table.exec.mini-batch.size" : "5"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "a",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : true,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false ],
+ "needRetraction" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "count$0",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "distinct$0",
+ "fieldType" : {
+ "type" : "RAW",
+ "class" : "org.apache.flink.table.api.dataview.MapView",
+ "externalDataType" : {
+ "type" : "STRUCTURED_TYPE",
+ "implementationClass" :
"org.apache.flink.table.api.dataview.MapView",
+ "attributes" : [ {
+ "name" : "map",
+ "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+ } ]
+ }
+ }
+ } ]
+ },
+ "description" : "LocalGroupAggregate(groupBy=[b], select=[b,
COUNT(distinct$0 a) AS count$0, DISTINCT(a) AS distinct$0])"
+ }, {
+ "id" : 21,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "b",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "count$0",
+ "fieldType" : "BIGINT"
+ }, {
+ "name" : "distinct$0",
+ "fieldType" : {
+ "type" : "RAW",
+ "class" : "org.apache.flink.table.api.dataview.MapView",
+ "externalDataType" : {
+ "type" : "STRUCTURED_TYPE",
+ "implementationClass" :
"org.apache.flink.table.api.dataview.MapView",
+ "attributes" : [ {
+ "name" : "map",
+ "attributeType" : "MAP<INT, BIGINT NOT NULL>"
+ } ]
+ }
+ }
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[b]])"
+ }, {
+ "id" : 22,
+ "type" : "stream-exec-global-group-aggregate_1",
+ "configuration" : {
+ "table.exec.mini-batch.enabled" : "true",
+ "table.exec.mini-batch.size" : "5"
+ },
+ "grouping" : [ 0 ],
+ "aggCalls" : [ {
+ "name" : "a",
+ "syntax" : "FUNCTION_STAR",
+ "internalName" : "$COUNT$1",
+ "argList" : [ 1 ],
+ "filterArg" : -1,
+ "distinct" : true,
+ "approximate" : false,
+ "ignoreNulls" : false,
+ "type" : "BIGINT NOT NULL"
+ } ],
+ "aggCallNeedRetractions" : [ false ],
+ "localAggInputRowType" : "ROW<`b` BIGINT, `a` INT>",
+ "generateUpdateBefore" : true,
+ "needRetraction" : false,
+ "state" : [ {
+ "index" : 0,
+ "ttl" : "0 ms",
+ "name" : "globalGroupAggregateState"
+ } ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>",
+ "description" : "GlobalGroupAggregate(groupBy=[b], select=[b,
COUNT(distinct$0 count$0) AS a])"
+ }, {
+ "id" : 23,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+ "inputUpsertKey" : [ 0 ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`b` BIGINT, `a` BIGINT NOT NULL>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[b, a])"
+ } ],
+ "edges" : [ {
+ "source" : 18,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 21,
+ "target" : 22,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 22,
+ "target" : 23,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
new file mode 100644
index 00000000000..1afe7af6919
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-proc-time/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
new file mode 100644
index 00000000000..7870d46a833
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/plan/mini-batch-assigner-row-time.json
@@ -0,0 +1,854 @@
+{
+ "flinkVersion" : "1.19",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_one_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "ts",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "id",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`ts`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "row_time",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`row_time` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647),
`num` INT, `name` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_one_t]], fields=[ts, id, num, name])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `row_time` TIMESTAMP(3)>",
+ "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS
row_time])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[row_time],
watermark=[(row_time - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-mini-batch-assigner_1",
+ "miniBatchInterval" : {
+ "interval" : 1000,
+ "mode" : "RowTime"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-window-table-function_1",
+ "configuration" : {
+ "table.local-time-zone" : "default"
+ },
+ "windowing" : {
+ "strategy" : "TimeAttribute",
+ "window" : {
+ "type" : "TumblingWindow",
+ "size" : "PT1S"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "timeAttributeIndex" : 3,
+ "isRowtime" : true
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "window_start",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "window_end",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "window_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time],
size=[1 s])])"
+ }, {
+ "id" : 6,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Calc(select=[id, num, name, window_start, window_end])"
+ }, {
+ "id" : 7,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 2 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 8,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_two_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "ts",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "id",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "dataType" : "INT"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "kind" : "COMPUTED",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "TO_TIMESTAMP(`ts`)"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "row_time",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "serializableString" : "`row_time` - INTERVAL '1' SECOND"
+ }
+ } ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`ts` VARCHAR(2147483647), `id` VARCHAR(2147483647),
`num` INT, `name` VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_two_t]], fields=[ts, id, num, name])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 9,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$TO_TIMESTAMP$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `row_time` TIMESTAMP(3)>",
+ "description" : "Calc(select=[id, num, name, TO_TIMESTAMP(ts) AS
row_time])"
+ }, {
+ "id" : 10,
+ "type" : "stream-exec-watermark-assigner_1",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1000",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3)"
+ },
+ "rowtimeFieldIndex" : 3,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WatermarkAssigner(rowtime=[row_time],
watermark=[(row_time - 1000:INTERVAL SECOND)])"
+ }, {
+ "id" : 11,
+ "type" : "stream-exec-mini-batch-assigner_1",
+ "miniBatchInterval" : {
+ "interval" : 1000,
+ "mode" : "RowTime"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "MiniBatchAssigner(interval=[1000ms], mode=[RowTime])"
+ }, {
+ "id" : 12,
+ "type" : "stream-exec-window-table-function_1",
+ "configuration" : {
+ "table.local-time-zone" : "default"
+ },
+ "windowing" : {
+ "strategy" : "TimeAttribute",
+ "window" : {
+ "type" : "TumblingWindow",
+ "size" : "PT1S"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "timeAttributeIndex" : 3,
+ "isRowtime" : true
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "id",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "num",
+ "fieldType" : "INT"
+ }, {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "row_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ }, {
+ "name" : "window_start",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "window_end",
+ "fieldType" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "name" : "window_time",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "WindowTableFunction(window=[TUMBLE(time_col=[row_time],
size=[1 s])])"
+ }, {
+ "id" : 13,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Calc(select=[id, num, name, window_start, window_end])"
+ }, {
+ "id" : 14,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 2 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL>",
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 15,
+ "type" : "stream-exec-window-join_1",
+ "configuration" : {
+ "table.local-time-zone" : "default"
+ },
+ "joinSpec" : {
+ "joinType" : "INNER",
+ "leftKeys" : [ 2 ],
+ "rightKeys" : [ 2 ],
+ "filterNulls" : [ true ],
+ "nonEquiCondition" : null
+ },
+ "leftWindowing" : {
+ "strategy" : "WindowAttached",
+ "window" : {
+ "type" : "TumblingWindow",
+ "size" : "PT1S"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "windowStart" : 3,
+ "windowEnd" : 4,
+ "isRowtime" : true
+ },
+ "rightWindowing" : {
+ "strategy" : "WindowAttached",
+ "window" : {
+ "type" : "TumblingWindow",
+ "size" : "PT1S"
+ },
+ "timeAttributeType" : {
+ "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ },
+ "windowStart" : 3,
+ "windowEnd" : 4,
+ "isRowtime" : true
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ }, {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`id` VARCHAR(2147483647), `num` INT, `name`
VARCHAR(2147483647), `window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL, `id0` VARCHAR(2147483647), `num0` INT, `name0`
VARCHAR(2147483647), `window_start0` TIMESTAMP(3) NOT NULL, `window_end0`
TIMESTAMP(3) NOT NULL>",
+ "description" : "WindowJoin(leftWindow=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[1 s])],
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1
s])], joinType=[InnerJoin], where=[(name = name0)], select=[id, num, name,
window_start, window_end, id0, num0, name0, window_start0, window_end0])"
+ }, {
+ "id" : 16,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 4,
+ "type" : "TIMESTAMP(3) NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 5,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 6,
+ "type" : "INT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647),
`L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>",
+ "description" : "Calc(select=[window_start, window_end, name, id AS L_id,
num AS L_num, id0 AS R_id, num0 AS R_num])"
+ }, {
+ "id" : 17,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "window_start",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "window_end",
+ "dataType" : "TIMESTAMP(3)"
+ }, {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "L_id",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "L_num",
+ "dataType" : "INT"
+ }, {
+ "name" : "R_id",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "R_num",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`window_start` TIMESTAMP(3) NOT NULL, `window_end`
TIMESTAMP(3) NOT NULL, `name` VARCHAR(2147483647), `L_id` VARCHAR(2147483647),
`L_num` INT, `R_id` VARCHAR(2147483647), `R_num` INT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[window_start, window_end, name, L_id, L_num, R_id, R_num])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 13,
+ "target" : 14,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 17,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
new file mode 100644
index 00000000000..804773b2a4a
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-mini-batch-assigner_1/mini-batch-assigner-row-time/savepoint/_metadata
differ