This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 783150aa9ab [FLINK-35942][table-planner] Add CompiledPlan annotations
to BatchExecCorrelate
783150aa9ab is described below
commit 783150aa9abc9b26f06db04b6807d0a57ae64579
Author: James Hughes <[email protected]>
AuthorDate: Wed Aug 7 11:55:57 2024 -0400
[FLINK-35942][table-planner] Add CompiledPlan annotations to
BatchExecCorrelate
---
.../plan/nodes/exec/batch/BatchExecCorrelate.java | 37 ++++
.../planner/plan/utils/ExecNodeMetadataUtil.java | 2 +
.../CorrelateBatchRestoreTest.java} | 13 +-
.../{stream => common}/CorrelateTestPrograms.java | 12 +-
.../nodes/exec/stream/CorrelateRestoreTest.java | 1 +
.../plan/correlate-catalog-func.json | 142 ++++++++++++++
.../plan/correlate-cross-join-unnest.json | 135 +++++++++++++
.../plan/correlate-join-filter.json | 209 +++++++++++++++++++++
.../plan/correlate-left-join.json | 138 ++++++++++++++
.../plan/correlate-system-func.json | 142 ++++++++++++++
10 files changed, 819 insertions(+), 12 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
index 3b5450c0941..c993a7943cb 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
@@ -18,23 +18,35 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.types.logical.RowType;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.List;
/** Batch exec node which matches along with join a Java/Scala user defined
table function. */
+@ExecNodeMetadata(
+ name = "batch-exec-correlate",
+ version = 1,
+ producedTransformations = CommonExecCorrelate.CORRELATE_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v2_0,
+ minStateVersion = FlinkVersion.v2_0)
public class BatchExecCorrelate extends CommonExecCorrelate implements
BatchExecNode<RowData> {
public BatchExecCorrelate(
@@ -58,4 +70,29 @@ public class BatchExecCorrelate extends CommonExecCorrelate
implements BatchExec
outputType,
description);
}
+
+ @JsonCreator
+ public BatchExecCorrelate(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig
persistedConfig,
+ @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType joinType,
+ @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation,
+ @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty>
inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ joinType,
+ (RexCall) invocation,
+ condition,
+ TableStreamOperator.class,
+ false, // retainHeader
+ inputProperties,
+ outputType,
+ description);
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index bd6a7d4e1da..9f2ec090968 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSort;
@@ -161,6 +162,7 @@ public final class ExecNodeMetadataUtil {
add(BatchExecExchange.class);
add(BatchExecSort.class);
add(BatchExecValues.class);
+ add(BatchExecCorrelate.class);
}
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
similarity index 78%
copy from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
copy to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
index cc24919cdca..32325b16df2 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/batch/CorrelateBatchRestoreTest.java
@@ -16,19 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
-import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.CorrelateTestPrograms;
+import
org.apache.flink.table.planner.plan.nodes.exec.testutils.BatchRestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
import java.util.Arrays;
import java.util.List;
-/** Restore tests for {@link StreamExecCorrelate}. */
-public class CorrelateRestoreTest extends RestoreTestBase {
+/** Batch Compiled Plan tests for {@link BatchExecCorrelate}. */
+public class CorrelateBatchRestoreTest extends BatchRestoreTestBase {
- public CorrelateRestoreTest() {
- super(StreamExecCorrelate.class);
+ public CorrelateBatchRestoreTest() {
+ super(BatchExecCorrelate.class);
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
similarity index 96%
rename from
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
rename to
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
index d1a2a1e46e3..09e46a20002 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CorrelateTestPrograms.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
+package org.apache.flink.table.planner.plan.nodes.exec.common;
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.StringSplit;
import org.apache.flink.table.planner.utils.TableFunc1;
@@ -36,7 +36,7 @@ public class CorrelateTestPrograms {
static final String[] SOURCE_SCHEMA = {"a BIGINT", "b INT NOT NULL", "c
VARCHAR"};
- static final TableTestProgram CORRELATE_CATALOG_FUNC =
+ public static final TableTestProgram CORRELATE_CATALOG_FUNC =
TableTestProgram.of(
"correlate-catalog-func",
"validate correlate with temporary catalog
function")
@@ -65,7 +65,7 @@ public class CorrelateTestPrograms {
"INSERT INTO sink_t SELECT c, s FROM source_t,
LATERAL TABLE(func1(c, '$')) AS T(s)")
.build();
- static final TableTestProgram CORRELATE_SYSTEM_FUNC =
+ public static final TableTestProgram CORRELATE_SYSTEM_FUNC =
TableTestProgram.of(
"correlate-system-func",
"validate correlate with temporary system
function")
@@ -94,7 +94,7 @@ public class CorrelateTestPrograms {
"INSERT INTO sink_t SELECT c, s FROM source_t,
LATERAL TABLE(STRING_SPLIT(c, '#')) AS T(s)")
.build();
- static final TableTestProgram CORRELATE_JOIN_FILTER =
+ public static final TableTestProgram CORRELATE_JOIN_FILTER =
TableTestProgram.of("correlate-join-filter", "validate correlate
with join and filter")
.setupTemporaryCatalogFunction("func1", TableFunc1.class)
.setupTableSource(
@@ -114,7 +114,7 @@ public class CorrelateTestPrograms {
"INSERT INTO sink_t SELECT * FROM (SELECT c, s
FROM source_t, LATERAL TABLE(func1(c)) AS T(s)) AS T2 WHERE c LIKE '%hello%' OR
c LIKE '%fiz%'")
.build();
- static final TableTestProgram CORRELATE_LEFT_JOIN =
+ public static final TableTestProgram CORRELATE_LEFT_JOIN =
TableTestProgram.of("correlate-left-join", "validate correlate
with left join")
.setupTemporaryCatalogFunction("func1", TableFunc1.class)
.setupTableSource(
@@ -141,7 +141,7 @@ public class CorrelateTestPrograms {
"INSERT INTO sink_t SELECT c, s FROM source_t LEFT
JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE")
.build();
- static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST =
+ public static final TableTestProgram CORRELATE_CROSS_JOIN_UNNEST =
TableTestProgram.of(
"correlate-cross-join-unnest",
"validate correlate with cross join and unnest")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
index cc24919cdca..02d487a80b5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateRestoreTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import
org.apache.flink.table.planner.plan.nodes.exec.common.CorrelateTestPrograms;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
import org.apache.flink.table.test.program.TableTestProgram;
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
new file mode 100644
index 00000000000..69623d2cd8f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-catalog-func/plan/correlate-catalog-func.json
@@ -0,0 +1,142 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "12f3bf79-0412-46aa-a3f2-4ed78bc05e75",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`func1`",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "c",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`s` VARCHAR(2147483647)> NOT NULL"
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : "$",
+ "type" : "CHAR(1) NOT NULL"
+ } ],
+ "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`EXPR$0` VARCHAR(2147483647)>",
+ "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')],
correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+ }, {
+ "id" : 3,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[c, EXPR$0 AS s])"
+ }, {
+ "id" : 4,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[c, s])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
new file mode 100644
index 00000000000..3e1dfd36382
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-cross-join-unnest/plan/correlate-cross-join-unnest.json
@@ -0,0 +1,135 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 18,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "arr",
+ "dataType" : "ARRAY<ROW<`nested` VARCHAR(2147483647)>>"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested`
VARCHAR(2147483647)>>>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[name, arr])",
+ "dynamicFilteringDataListenerID" : "6956ae02-b818-4915-8709-b0dacd1e40ef",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 19,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "internalName" : "$UNNEST_ROWS$1",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "arr",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested`
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`nested` VARCHAR(2147483647)>"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `arr` ARRAY<ROW<`nested`
VARCHAR(2147483647)>>, `nested` VARCHAR(2147483647)>",
+ "description" : "Correlate(invocation=[$UNNEST_ROWS$1($cor0.arr)],
correlate=[table($UNNEST_ROWS$1($cor0.arr))], select=[name,arr,nested],
rowType=[RecordType(VARCHAR(2147483647) name,
RecordType:peek_no_expand(VARCHAR(2147483647) nested) ARRAY arr,
VARCHAR(2147483647) nested)], joinType=[INNER])"
+ }, {
+ "id" : 20,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `nested`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[name, nested])"
+ }, {
+ "id" : 21,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "nested",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`name` VARCHAR(2147483647), `nested`
VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[name, nested])"
+ } ],
+ "edges" : [ {
+ "source" : 18,
+ "target" : 19,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 19,
+ "target" : 20,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 20,
+ "target" : 21,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
new file mode 100644
index 00000000000..1528204c18f
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-join-filter/plan/correlate-join-filter.json
@@ -0,0 +1,209 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 9,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ },
+ "abilities" : [ {
+ "type" : "FilterPushDown",
+ "predicates" : [ ]
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t, filter=[]]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "5f166bc0-c35d-48d4-b73f-5942fa8e2348",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 10,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 1,
+ "type" : "INT NOT NULL"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$OR$1",
+ "operands" : [ {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$LIKE$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "%hello%",
+ "type" : "CHAR(7) NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$LIKE$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "%fiz%",
+ "type" : "CHAR(5) NOT NULL"
+ } ],
+ "type" : "BOOLEAN"
+ } ],
+ "type" : "BOOLEAN"
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "Calc(select=[a, b, c], where=[(LIKE(c, '%hello%') OR
LIKE(c, '%fiz%'))])"
+ }, {
+ "id" : 11,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`func1`",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "c",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`s` VARCHAR(2147483647)> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`EXPR$0` VARCHAR(2147483647)>",
+ "description" : "Correlate(invocation=[func1($cor0.c)],
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+ }, {
+ "id" : 12,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[c, EXPR$0 AS s])"
+ }, {
+ "id" : 13,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[c, s])"
+ } ],
+ "edges" : [ {
+ "source" : 9,
+ "target" : 10,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 10,
+ "target" : 11,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 11,
+ "target" : 12,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 12,
+ "target" : 13,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
new file mode 100644
index 00000000000..52d3936d931
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-left-join/plan/correlate-left-join.json
@@ -0,0 +1,138 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 14,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "f78696e3-bba1-4edf-9d9b-3a0c59f3deed",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 15,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "LEFT",
+ "functionCall" : {
+ "kind" : "CALL",
+ "catalogName" : "`default_catalog`.`default_database`.`func1`",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "c",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`s` VARCHAR(2147483647)> NOT NULL"
+ }
+ } ],
+ "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`EXPR$0` VARCHAR(2147483647)>",
+ "description" : "Correlate(invocation=[func1($cor0.c)],
correlate=[table(func1($cor0.c))], select=[a,b,c,EXPR$0],
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c,
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])"
+ }, {
+ "id" : 16,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[c, EXPR$0 AS s])"
+ }, {
+ "id" : 17,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[c, s])"
+ } ],
+ "edges" : [ {
+ "source" : 14,
+ "target" : 15,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 15,
+ "target" : 16,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 16,
+ "target" : 17,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
new file mode 100644
index 00000000000..a1f287232a6
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-correlate_1/correlate-system-func/plan/correlate-system-func.json
@@ -0,0 +1,142 @@
+{
+ "flinkVersion" : "2.0",
+ "nodes" : [ {
+ "id" : 5,
+ "type" : "batch-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`source_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c`
VARCHAR(2147483647)>",
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, source_t]], fields=[a, b, c])",
+ "dynamicFilteringDataListenerID" : "c180695a-cc29-44d8-b769-6dd6ed9fee20",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 6,
+ "type" : "batch-exec-correlate_1",
+ "joinType" : "INNER",
+ "functionCall" : {
+ "kind" : "CALL",
+ "systemName" : "STRING_SPLIT",
+ "operands" : [ {
+ "kind" : "FIELD_ACCESS",
+ "name" : "c",
+ "expr" : {
+ "kind" : "CORREL_VARIABLE",
+ "correl" : "$cor0",
+ "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`s` VARCHAR(2147483647)> NOT NULL"
+ }
+ }, {
+ "kind" : "LITERAL",
+ "value" : "#",
+ "type" : "CHAR(1) NOT NULL"
+ } ],
+ "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
+ },
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647),
`EXPR$0` VARCHAR(2147483647)>",
+ "description" : "Correlate(invocation=[STRING_SPLIT($cor0.c,
_UTF-16LE'#')], correlate=[table(STRING_SPLIT($cor0.c,'#'))],
select=[a,b,c,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b,
VARCHAR(2147483647) c, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
+ }, {
+ "id" : 7,
+ "type" : "batch-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "VARCHAR(2147483647)"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 3,
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Calc(select=[c, EXPR$0 AS s])"
+ }, {
+ "id" : 8,
+ "type" : "batch-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.type-length-enforcer" : "IGNORE"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "b",
+ "dataType" : "VARCHAR(2147483647)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ]
+ }
+ }
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "BLOCKING",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
+ "description" : "Sink(table=[default_catalog.default_database.sink_t],
fields=[c, s])"
+ } ],
+ "edges" : [ {
+ "source" : 5,
+ "target" : 6,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 6,
+ "target" : 7,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 7,
+ "target" : 8,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file