This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 783150aa9ab [FLINK-35942][table-planner] Add CompiledPlan annotations 
to BatchExecCorrelate
783150aa9ab is described below

commit 783150aa9abc9b26f06db04b6807d0a57ae64579
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 7 11:55:57 2024 -0400

    [FLINK-35942][table-planner] Add CompiledPlan annotations to 
BatchExecCorrelate
---
 .../plan/nodes/exec/batch/BatchExecCorrelate.java  |  37 ++++
 .../planner/plan/utils/ExecNodeMetadataUtil.java   |   2 +
 .../CorrelateBatchRestoreTest.java}                |  13 +-
 .../{stream => common}/CorrelateTestPrograms.java  |  12 +-
 .../nodes/exec/stream/CorrelateRestoreTest.java    |   1 +
 .../plan/correlate-catalog-func.json               | 142 ++++++++++++++
 .../plan/correlate-cross-join-unnest.json          | 135 +++++++++++++
 .../plan/correlate-join-filter.json                | 209 +++++++++++++++++++++
 .../plan/correlate-left-join.json                  | 138 ++++++++++++++
 .../plan/correlate-system-func.json                | 142 ++++++++++++++
 10 files changed, 819 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
index 3b5450c0941..c993a7943cb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
@@ -18,23 +18,35 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.FlinkVersion;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.types.logical.RowType;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
 
 import javax.annotation.Nullable;
 
 import java.util.Collections;
+import java.util.List;
 
 /** Batch exec node which matches along with join a Java/Scala user defined 
table function. */
+@ExecNodeMetadata(
+        name = "batch-exec-correlate",
+        version = 1,
+        producedTransformations = CommonExecCorrelate.CORRELATE_TRANSFORMATION,
+        minPlanVersion = FlinkVersion.v2_0,
+        minStateVersion = FlinkVersion.v2_0)
 public class BatchExecCorrelate extends CommonExecCorrelate implements 
BatchExecNode<RowData> {
 
     public BatchExecCorrelate(
@@ -58,4 +70,29 @@ public class BatchExecCorrelate extends CommonExecCorrelate 
implements BatchExec
                 outputType,
                 description);
     }
+
+    @JsonCreator
+    public BatchExecCorrelate(
+            @JsonProperty(FIELD_NAME_ID) int id,
+            @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+            @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
+            @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+            @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+            @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
+            @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
+            @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+            @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+        super(
+                id,
+                context,
+                persistedConfig,
+                joinType,
+                (RexCall) invocation,
+                condition,
+                TableStreamOperator.class,
+                false, // retainHeader
+                inputProperties,
+                outputType,
+                description);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index bd6a7d4e1da..9f2ec090968 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
@@ -161,6 +162,7 @@ public final class ExecNodeMetadataUtil {
                     add(BatchExecExchange.class);
                     add(BatchExecSort.class);
                     add(BatchExecValues.class);
+                    add(BatchExecCorrelate.class);
                 }
             };
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
similarity index 78%
copy from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
copy to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
index cc24919cdca..32325b16df2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
@@ -16,19 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
-import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CorrelateTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
 
-/** Restore tests for {@link StreamExecCorrelate}. */
-public class CorrelateRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecCorrelate}. */
+public class CorrelateBatchRestoreTest extends BatchRestoreTestBase {
 
-    public CorrelateRestoreTest() {
-        super(StreamExecCorrelate.class);
+    public CorrelateBatchRestoreTest() {
+        super(BatchExecCorrelate.class);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index d1a2a1e46e3..09e46a20002 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
 
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit;
 import org.apache.flink.table.planner.utils.TableFunc1;
@@ -36,7 +36,7 @@ public class CorrelateTestPrograms {
 
     static final String[] SOURCE_SCHEMA = {"a BIGINT", "b INT NOT NULL", "c 
VARCHAR"};
 
-    static final TableTestProgram CORRELATE_CATALOG_FUNC =
+    public static final TableTestProgram CORRELATE_CATALOG_FUNC =
             TableTestProgram.of(
                             "correlate-catalog-func",
                             "validate correlate with temporary catalog 
function")
@@ -65,7 +65,7 @@ public class CorrelateTestPrograms {
                             "INSERT INTO sink_t SELECT c, s FROM source_t, 
LATERAL TABLE(func1(c, '$')) AS T(s)")
                     .build();
 
-    static final TableTestProgram CORRELATE_SYSTEM_FUNC =
+    public static final TableTestProgram CORRELATE_SYSTEM_FUNC =
             TableTestProgram.of(
                             "correlate-system-func",
                             "validate correlate with temporary system 
function")
@@ -94,7 +94,7 @@ public class CorrelateTestPrograms {
                             "INSERT INTO sink_t SELECT c, s FROM source_t, 
LATERAL TABLE(STRING_SPLIT(c, '#')) AS T(s)")
                     .build();
 
-    static final TableTestProgram CORRELATE_JOIN_FILTER =
+    public static final TableTestProgram CORRELATE_JOIN_FILTER =
             TableTestProgram.of("correlate-join-filter", "validate correlate 
with join and filter")
                     .setupTemporaryCatalogFunction("func1", TableFunc1.class)
                     .setupTableSource(
@@ -114,7 +114,7 @@ public class CorrelateTestPrograms {
                             "INSERT INTO sink_t SELECT * FROM (SELECT c, s 
FROM source_t, LATERAL TABLE(func1(c)) AS T(s)) AS T2 WHERE c LIKE '%hello%' OR 
c LIKE '%fiz%'")
                     .build();
 
-    static final TableTestProgram CORRELATE_LEFT_JOIN =
+    public static final TableTestProgram CORRELATE_LEFT_JOIN =
             TableTestProgram.of("correlate-left-join", "validate correlate 
with left join")
                     .setupTemporaryCatalogFunction("func1", TableFunc1.class)
                     .setupTableSource(
@@ -141,7 +141,7 @@ public class CorrelateTestPrograms {
                             "INSERT INTO sink_t SELECT c, s FROM source_t LEFT 
JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE")
                     .build();
 
-    static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST =
+    public static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST =
             TableTestProgram.of(
                             "correlate-cross-join-unnest",
                             "validate correlate with cross join and unnest")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index cc24919cdca..02d487a80b5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CorrelateTestPrograms;
 import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
 import org.apache.flink.table.test.program.TableTestProgram;
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
new file mode 100644
index 00000000000..69623d2cd8f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
@@ -0,0 +1,142 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "12f3bf79-0412-46aa-a3f2-4ed78bc05e75",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : "$",
+        "type" : "CHAR(1) NOT NULL"
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')], 
correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 3,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 4,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
new file mode 100644
index 00000000000..3e1dfd36382
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
@@ -0,0 +1,135 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 18,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "arr",
+              "dataType" : "ARRAY<ROW<`nested` VARCHAR(2147483647)>>"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[name, arr])",
+    "dynamicFilteringDataListenerID" : "6956ae02-b818-4915-8709-b0dacd1e40ef",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 19,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$UNNEST_ROWS$1",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "arr",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`nested` VARCHAR(2147483647)>"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested` 
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.arr)], 
correlate=[table($UNNEST_ROWS$1($cor0.arr))], select=[name,arr,nested], 
rowType=[RecordType(VARCHAR(2147483647) name, 
RecordType:peek_no_expand(VARCHAR(2147483647) nested) ARRAY arr, 
VARCHAR(2147483647) nested)], joinType=[INNER])"
+  }, {
+    "id" : 20,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[name, nested])"
+  }, {
+    "id" : 21,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "nested",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `nested` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, nested])"
+  } ],
+  "edges" : [ {
+    "source" : 18,
+    "target" : 19,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
new file mode 100644
index 00000000000..1528204c18f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
@@ -0,0 +1,209 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 9,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t, filter=[]]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "5f166bc0-c35d-48d4-b73f-5942fa8e2348",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 10,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "INT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$OR$1",
+      "operands" : [ {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$LIKE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "%hello%",
+          "type" : "CHAR(7) NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$LIKE$1",
+        "operands" : [ {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "VARCHAR(2147483647)"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "%fiz%",
+          "type" : "CHAR(5) NOT NULL"
+        } ],
+        "type" : "BOOLEAN"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, c], where=[(LIKE(c, '%hello%') OR 
LIKE(c, '%fiz%'))])"
+  }, {
+    "id" : 11,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 12,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 13,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
new file mode 100644
index 00000000000..52d3936d931
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
@@ -0,0 +1,138 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 14,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "f78696e3-bba1-4edf-9d9b-3a0c59f3deed",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 15,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "LEFT",
+    "functionCall" : {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`func1`",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])"
+  }, {
+    "id" : 16,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 17,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
new file mode 100644
index 00000000000..a1f287232a6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
@@ -0,0 +1,142 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 5,
+    "type" : "batch-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "dynamicFilteringDataListenerID" : "c180695a-cc29-44d8-b769-6dd6ed9fee20",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 6,
+    "type" : "batch-exec-correlate_1",
+    "joinType" : "INNER",
+    "functionCall" : {
+      "kind" : "CALL",
+      "systemName" : "STRING_SPLIT",
+      "operands" : [ {
+        "kind" : "FIELD_ACCESS",
+        "name" : "c",
+        "expr" : {
+          "kind" : "CORREL_VARIABLE",
+          "correl" : "$cor0",
+          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`s` VARCHAR(2147483647)> NOT NULL"
+        }
+      }, {
+        "kind" : "LITERAL",
+        "value" : "#",
+        "type" : "CHAR(1) NOT NULL"
+      } ],
+      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+    },
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`EXPR$0` VARCHAR(2147483647)>",
+    "description" : "Correlate(invocation=[STRING_SPLIT($cor0.c, 
_UTF-16LE'#')], correlate=[table(STRING_SPLIT($cor0.c,'#'))], 
select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, 
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+  }, {
+    "id" : 7,
+    "type" : "batch-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "VARCHAR(2147483647)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[c, EXPR$0 AS s])"
+  }, {
+    "id" : 8,
+    "type" : "batch-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "BLOCKING",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[c, s])"
+  } ],
+  "edges" : [ {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file


Reply via email to