This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 99bf496b41c [FLINK-36049][table-planner] Add CompiledPlan annotations
to BatchExecSortLimit
99bf496b41c is described below
commit 99bf496b41cc30c5b633927a276144e84a642545
Author: James Hughes <[email protected]>
AuthorDate: Fri Aug 16 13:28:27 2024 -0400
[FLINK-36049][table-planner] Add CompiledPlan annotations to
BatchExecSortLimit
---
.../plan/nodes/exec/batch/BatchExecSortLimit.java | 48 ++++-
.../planner/plan/utils/ExecNodeMetadataUtil.java | 2 +
.../exec/batch/SortLimitBatchRestoreTest.java | 39 +++++
.../plan/nodes/exec/common/SortTestPrograms.java | 4 +
.../sort-limit-asc/plan/sort-limit-asc.json | 194 +++++++++++++++++++++
.../sort-limit-desc/plan/sort-limit-desc.json | 194 +++++++++++++++++++++
6 files changed, 479 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
index bf3de44aee9..6de8cf650af 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -29,6 +30,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
@@ -38,19 +40,43 @@ import
org.apache.flink.table.runtime.operators.sort.SortLimitOperator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_IS_GLOBAL;
+import static
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_LIMIT_END;
+import static
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_LIMIT_START;
+import static
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort.FIELD_NAME_SORT_SPEC;
/**
* {@link BatchExecNode} for Sort with limit.
*
* <p>This node will output data rank from `limitStart` to `limitEnd`.
*/
+@ExecNodeMetadata(
+ name = "batch-exec-sort-limit",
+ version = 1,
+ producedTransformations = BatchExecSortLimit.SORT_LIMIT_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecSortLimit extends ExecNodeBase<RowData>
implements BatchExecNode<RowData>,
SingleTransformationTranslator<RowData> {
+ public static final String SORT_LIMIT_TRANSFORMATION = "sort-limit";
+
+ @JsonProperty(FIELD_NAME_SORT_SPEC)
private final SortSpec sortSpec;
+
+ @JsonProperty(FIELD_NAME_LIMIT_START)
private final long limitStart;
+
+ @JsonProperty(FIELD_NAME_LIMIT_END)
private final long limitEnd;
+
+ @JsonProperty(FIELD_NAME_IS_GLOBAL)
private final boolean isGlobal;
public BatchExecSortLimit(
@@ -75,6 +101,25 @@ public class BatchExecSortLimit extends
ExecNodeBase<RowData>
this.isGlobal = isGlobal;
}
+ @JsonCreator
+ public BatchExecSortLimit(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+ @JsonProperty(FIELD_NAME_LIMIT_START) long limitStart,
+ @JsonProperty(FIELD_NAME_LIMIT_END) long limitEnd,
+ @JsonProperty(FIELD_NAME_IS_GLOBAL) boolean isGlobal,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(id, context, persistedConfig, inputProperties, outputType,
description);
+ this.sortSpec = sortSpec;
+ this.limitStart = limitStart;
+ this.limitEnd = limitEnd;
+ this.isGlobal = isGlobal;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
@@ -103,8 +148,7 @@ public class BatchExecSortLimit extends
ExecNodeBase<RowData>
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
- createTransformationName(config),
- createTransformationDescription(config),
+ createTransformationMeta(SORT_LIMIT_TRANSFORMATION, config),
SimpleOperatorFactory.of(operator),
InternalTypeInfo.of(inputType),
inputTransform.getParallelism(),
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 1fc15ce9850..7e4f5e36838 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -39,6 +39,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJ
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit;
import
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
@@ -177,6 +178,7 @@ public final class ExecNodeMetadataUtil {
add(BatchExecHashAggregate.class);
add(BatchExecExpand.class);
add(BatchExecSortAggregate.class);
+ add(BatchExecSortLimit.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
new file mode 100644
index 00000000000..9adeae2ad6a
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.table.planner.plan.nodes.exec.common.SortTestPrograms;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Batch Compiled Plan tests for {@link BatchExecSortLimit}. */
+public class SortLimitBatchRestoreTest extends BatchRestoreTestBase {
+
+ public SortLimitBatchRestoreTest() {
+ super(BatchExecSortLimit.class);
+ }
+
+ @Override
+ public List<TableTestProgram> programs() {
+ return Arrays.asList(SortTestPrograms.SORT_LIMIT_ASC,
SortTestPrograms.SORT_LIMIT_DESC);
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
index c0e39fb1469..51fe843c52d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
@@ -84,6 +84,8 @@ public class SortTestPrograms {
"+I[2, a, 6]",
"-D[2, a, 6]",
"+I[1, a, 5]")
+ .expectedMaterializedStrings(
+ "+I[1, a, 5]", "+I[1, a, 5]",
"+I[2, a, 6]")
.build())
.runSql("INSERT INTO sink_t SELECT * from source_t ORDER
BY a LIMIT 3")
.build();
@@ -122,6 +124,8 @@ public class SortTestPrograms {
// [5, c, 9]
// [6, c, 10] [6, c, 10]
.consumedAfterRestore("-D[4, b, 8]",
"+I[6, c, 10]")
+ .expectedMaterializedStrings(
+ "+I[6, c, 10]", "+I[6, c, 10]",
"+I[5, c, 9]")
.build())
.runSql("INSERT INTO sink_t SELECT * from source_t ORDER
BY a DESC LIMIT 3")
.build();
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
new file mode 100644
index 00000000000..7a5164d44ef
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
@@ -0,0 +1,194 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "0aa49602-aade-495b-bfe4-07dabee92141",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-sort-limit_1",
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "limitStart" : 0,
+ "limitEnd" : 3,
+ "isGlobal" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3],
global=[false])"
+ }, {
+ "id" : 3,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "Exchange(distribution=[single])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 4,
+ "type" : "batch-exec-sort-limit_1",
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "limitStart" : 0,
+ "limitEnd" : 3,
+ "isGlobal" : true,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3],
global=[true])"
+ }, {
+ "id" : 5,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+ }, {
+ "id" : 6,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b, c])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
new file mode 100644
index 00000000000..f22f5fd060f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
@@ -0,0 +1,194 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 7,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "INT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "6b3acb30-80be-4b3f-b7c3-a56d415e2d1b",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 8,
+ "type" : "batch-exec-sort-limit_1",
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : false,
+ "nullIsLast" : true
+ } ]
+ },
+ "limitStart" : 0,
+ "limitEnd" : 3,
+ "isGlobal" : false,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3],
global=[false])"
+ }, {
+ "id" : 9,
+ "type" : "batch-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "Exchange(distribution=[single])",
+ "requiredExchangeMode" : "UNDEFINED"
+ }, {
+ "id" : 10,
+ "type" : "batch-exec-sort-limit_1",
+ "sortSpec" : {
+ "fields" : [ {
+ "index" : 0,
+ "isAscending" : false,
+ "nullIsLast" : true
+ } ]
+ },
+ "limitStart" : 0,
+ "limitEnd" : 3,
+ "isGlobal" : true,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "END_INPUT",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+ "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3],
global=[true])"
+ }, {
+ "id" : 11,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "INT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$CAST$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "INT"
+ } ],
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+ }, {
+ "id" : 12,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "INT"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "c",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[a, b, c])"
+ } ],
+ "edges" : [ {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 8,
+ "target" : 9,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file