This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 99bf496b41c [FLINK-36049][table-planner] Add CompiledPlan annotations 
to BatchExecSortLimit
99bf496b41c is described below

commit 99bf496b41cc30c5b633927a276144e84a642545
Author: James Hughes <[email protected]>
AuthorDate: Fri Aug 16 13:28:27 2024 -0400

    [FLINK-36049][table-planner] Add CompiledPlan annotations to 
BatchExecSortLimit
---
 .../plan/nodes/exec/batch/BatchExecSortLimit.java  |  48 ++++-
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   2 +
 .../exec/batch/SortLimitBatchRestoreTest.java      |  39 +++++
 .../plan/nodes/exec/common/SortTestPrograms.java   |   4 +
 .../sort-limit-asc/plan/sort-limit-asc.json        | 194 +++++++++++++++++++++
 .../sort-limit-desc/plan/sort-limit-desc.json      | 194 +++++++++++++++++++++
 6 files changed, 479 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
index bf3de44aee9..6de8cf650af 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortLimit.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
@@ -29,6 +30,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
@@ -38,19 +40,43 @@ import 
org.apache.flink.table.runtime.operators.sort.SortLimitOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_IS_GLOBAL;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_LIMIT_END;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecLimit.FIELD_NAME_LIMIT_START;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort.FIELD_NAME_SORT_SPEC;
 
 /**
  * {@link BatchExecNode} for Sort with limit.
  *
  * <p>This node will output data rank from `limitStart` to `limitEnd`.
  */
+@ExecNodeMetadata(
+        name = "batch-exec-sort-limit",
+        version = 1,
+        producedTransformations = BatchExecSortLimit.SORT_LIMIT_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecSortLimit extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
 
+    public static final String SORT_LIMIT_TRANSFORMATION = "sort-limit";
+
+    @JsonProperty(FIELD_NAME_SORT_SPEC)
     private final SortSpec sortSpec;
+
+    @JsonProperty(FIELD_NAME_LIMIT_START)
     private final long limitStart;
+
+    @JsonProperty(FIELD_NAME_LIMIT_END)
     private final long limitEnd;
+
+    @JsonProperty(FIELD_NAME_IS_GLOBAL)
     private final boolean isGlobal;
 
     public BatchExecSortLimit(
@@ -75,6 +101,25 @@ public class BatchExecSortLimit extends 
ExecNodeBase<RowData>
         this.isGlobal = isGlobal;
     }
 
+    @JsonCreator
+    public BatchExecSortLimit(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+            @JsonProperty(FIELD_NAME_LIMIT_START) long limitStart,
+            @JsonProperty(FIELD_NAME_LIMIT_END) long limitEnd,
+            @JsonProperty(FIELD_NAME_IS_GLOBAL) boolean isGlobal,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(id, context, persistedConfig, inputProperties, outputType, 
description);
+        this.sortSpec = sortSpec;
+        this.limitStart = limitStart;
+        this.limitEnd = limitEnd;
+        this.isGlobal = isGlobal;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     protected Transformation<RowData> translateToPlanInternal(
@@ -103,8 +148,7 @@ public class BatchExecSortLimit extends 
ExecNodeBase<RowData>
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
-                createTransformationName(config),
-                createTransformationDescription(config),
+                createTransformationMeta(SORT_LIMIT_TRANSFORMATION, config),
                 SimpleOperatorFactory.of(operator),
                 InternalTypeInfo.of(inputType),
                 inputTransform.getParallelism(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 1fc15ce9850..7e4f5e36838 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNestedLoopJ
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSortLimit;
 import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecValues;
@@ -177,6 +178,7 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecHashAggregate.class);
                     add(BatchExecExpand.class);
                     add(BatchExecSortAggregate.class);
+                    add(BatchExecSortLimit.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
new file mode 100644
index 00000000000..9adeae2ad6a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/SortLimitBatchRestoreTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.table.planner.plan.nodes.exec.common.SortTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Batch Compiled Plan tests for {@link BatchExecSortLimit}. */
+public class SortLimitBatchRestoreTest extends BatchRestoreTestBase {
+
+    public SortLimitBatchRestoreTest() {
+        super(BatchExecSortLimit.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(SortTestPrograms.SORT_LIMIT_ASC, 
SortTestPrograms.SORT_LIMIT_DESC);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
index c0e39fb1469..51fe843c52d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/SortTestPrograms.java
@@ -84,6 +84,8 @@ public class SortTestPrograms {
                                             "+I[2, a, 6]",
                                             "-D[2, a, 6]",
                                             "+I[1, a, 5]")
+                                    .expectedMaterializedStrings(
+                                            "+I[1, a, 5]", "+I[1, a, 5]", 
"+I[2, a, 6]")
                                     .build())
                     .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a LIMIT 3")
                     .build();
@@ -122,6 +124,8 @@ public class SortTestPrograms {
                                     //       [5, c, 9]
                                     // [6, c, 10]  [6, c, 10]
                                     .consumedAfterRestore("-D[4, b, 8]", 
"+I[6, c, 10]")
+                                    .expectedMaterializedStrings(
+                                            "+I[6, c, 10]", "+I[6, c, 10]", 
"+I[5, c, 9]")
                                     .build())
                     .runSql("INSERT INTO sink_t SELECT * from source_t ORDER 
BY a DESC LIMIT 3")
                     .build();
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
new file mode 100644
index 00000000000..7a5164d44ef
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-asc/plan/sort-limit-asc.json
@@ -0,0 +1,194 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "0aa49602-aade-495b-bfe4-07dabee92141",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-sort-limit_1",
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3], 
global=[false])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-sort-limit_1",
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : true,
+        "nullIsLast" : false
+      } ]
+    },
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a ASC], offset=[0], fetch=[3], 
global=[true])"
+  }, {
+    "id" : 5,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
new file mode 100644
index 00000000000..f22f5fd060f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-sort-limit_1/sort-limit-desc/plan/sort-limit-desc.json
@@ -0,0 +1,194 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 7,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "6b3acb30-80be-4b3f-b7c3-a56d415e2d1b",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sort-limit_1",
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : false,
+        "nullIsLast" : true
+      } ]
+    },
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : false,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3], 
global=[false])"
+  }, {
+    "id" : 9,
+    "type" : "batch-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Exchange(distribution=[single])",
+    "requiredExchangeMode" : "UNDEFINED"
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-sort-limit_1",
+    "sortSpec" : {
+      "fields" : [ {
+        "index" : 0,
+        "isAscending" : false,
+        "nullIsLast" : true
+      } ]
+    },
+    "limitStart" : 0,
+    "limitEnd" : 3,
+    "isGlobal" : true,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "SINGLETON"
+      },
+      "damBehavior" : "END_INPUT",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "SortLimit(orderBy=[a DESC], offset=[0], fetch=[3], 
global=[true])"
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 2,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Calc(select=[a, b, CAST(c AS BIGINT) AS c])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "BIGINT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file

Reply via email to