This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 263f3283724a5081e41f679659fa6a5819350739
Author: bvarghese1 <[email protected]>
AuthorDate: Mon Jan 8 09:57:49 2024 -0800

    [FLINK-33896] Remove Correlate Json Plan & Json IT tests
    
    - These are covered by the restore tests
---
 .../nodes/exec/stream/CorrelateJsonPlanTest.java   | 139 -----------------
 .../stream/jsonplan/CorrelateJsonPlanITCase.java   | 119 ---------------
 .../testCrossJoin.out                              | 152 -------------------
 .../testCrossJoinOverrideParameters.out            | 156 -------------------
 .../testJoinWithFilter.out                         | 166 ---------------------
 .../testLeftOuterJoinWithLiteralTrue.out           | 152 -------------------
 6 files changed, 884 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java
deleted file mode 100644
index d7672a03e03..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableFunc1;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-/** Test json serialization/deserialization for correlate. */
-class CorrelateJsonPlanTest extends TableTestBase {
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        String srcTableDdl =
-                "CREATE TABLE MyTable (\n"
-                        + "  a bigint,\n"
-                        + "  b int not null,\n"
-                        + "  c varchar,\n"
-                        + "  d timestamp(3)\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'bounded' = 'false')";
-        tEnv.executeSql(srcTableDdl);
-    }
-
-    @Test
-    void testCrossJoin() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a varchar,\n"
-                        + "  b varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        util.addTemporarySystemFunction("func1", TableFunc1.class);
-        String sqlQuery =
-                "insert into MySink SELECT c, s FROM MyTable, LATERAL 
TABLE(func1(c)) AS T(s)";
-        util.verifyJsonPlan(sqlQuery);
-    }
-
-    @Test
-    @Disabled("the case is ignored because of FLINK-21870")
-    void testRegisterByClass() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a varchar,\n"
-                        + "  b varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        tEnv.createTemporaryFunction("func1", TableFunc1.class);
-        String sqlQuery =
-                "insert into MySink SELECT c, s FROM MyTable, LATERAL 
TABLE(func1(c)) AS T(s)";
-        util.verifyJsonPlan(sqlQuery);
-    }
-
-    @Test
-    void testCrossJoinOverrideParameters() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a varchar,\n"
-                        + "  b varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        util.addTemporarySystemFunction("func1", TableFunc1.class);
-        String sqlQuery =
-                "insert into MySink SELECT c, s FROM MyTable, LATERAL 
TABLE(func1(c, '$')) AS T(s)";
-        util.verifyJsonPlan(sqlQuery);
-    }
-
-    @Test
-    void testLeftOuterJoinWithLiteralTrue() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a varchar,\n"
-                        + "  b varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        util.addTemporarySystemFunction("func1", TableFunc1.class);
-        String sqlQuery =
-                "insert into MySink SELECT c, s FROM MyTable LEFT JOIN LATERAL 
TABLE(func1(c)) AS T(s) ON TRUE";
-        util.verifyJsonPlan(sqlQuery);
-    }
-
-    @Test
-    void testJoinWithFilter() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a varchar,\n"
-                        + "  b varchar\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-
-        util.addTemporarySystemFunction("func1", TableFunc1.class);
-        String sqlQuery =
-                "insert into MySink "
-                        + "select * from (SELECT c, s FROM MyTable, LATERAL 
TABLE(func1(c)) AS T(s)) as T2 where c = s";
-        util.verifyJsonPlan(sqlQuery);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
deleted file mode 100644
index a6e4fe6198a..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.apache.flink.types.Row;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/** Integration tests for correlate. */
-class CorrelateJsonPlanITCase extends JsonPlanTestBase {
-
-    @BeforeEach
-    void before() {
-        List<Row> data = Collections.singletonList(Row.of("1,1,hi"));
-        createTestValuesSourceTable("MyTable", data, "a varchar");
-    }
-
-    @Test
-    void testSystemFuncByObject() throws ExecutionException, 
InterruptedException {
-        tableEnv.createTemporarySystemFunction(
-                "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
-        createTestValuesSinkTable("MySink", "a STRING", "b STRING");
-        String query =
-                "insert into MySink SELECT a, v FROM MyTable, lateral 
table(STRING_SPLIT(a, ',')) as T(v)";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 
1]", "+I[1,1,hi, hi]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testSystemFuncByClass() throws ExecutionException, 
InterruptedException {
-        tableEnv.createTemporarySystemFunction(
-                "STRING_SPLIT", 
JavaUserDefinedTableFunctions.StringSplit.class);
-        createTestValuesSinkTable("MySink", "a STRING", "b STRING");
-        String query =
-                "insert into MySink SELECT a, v FROM MyTable, lateral 
table(STRING_SPLIT(a, ',')) as T(v)";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 
1]", "+I[1,1,hi, hi]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testTemporaryFuncByObject() throws ExecutionException, 
InterruptedException {
-        tableEnv.createTemporaryFunction(
-                "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
-        createTestValuesSinkTable("MySink", "a STRING", "b STRING");
-        String query =
-                "insert into MySink SELECT a, v FROM MyTable, lateral 
table(STRING_SPLIT(a, ',')) as T(v)";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 
1]", "+I[1,1,hi, hi]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testTemporaryFuncByClass() throws ExecutionException, 
InterruptedException {
-        tableEnv.createTemporaryFunction(
-                "STRING_SPLIT", 
JavaUserDefinedTableFunctions.StringSplit.class);
-        createTestValuesSinkTable("MySink", "a STRING", "b STRING");
-        String query =
-                "insert into MySink SELECT a, v FROM MyTable, lateral 
table(STRING_SPLIT(a, ',')) as T(v)";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 
1]", "+I[1,1,hi, hi]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testFilter() throws ExecutionException, InterruptedException {
-        tableEnv.createTemporarySystemFunction(
-                "STRING_SPLIT", new 
JavaUserDefinedTableFunctions.StringSplit());
-        createTestValuesSinkTable("MySink", "a STRING", "b STRING");
-        String query =
-                "insert into MySink "
-                        + "SELECT a, v FROM MyTable, lateral 
table(STRING_SPLIT(a, ',')) as T(v) "
-                        + "where try_cast(v as int) > 0";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 
1]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testUnnest() throws ExecutionException, InterruptedException {
-        List<Row> data =
-                Collections.singletonList(
-                        Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), 
Row.of("3")}));
-        createTestValuesSourceTable(
-                "MyNestedTable", data, "name STRING", "arr ARRAY<ROW<nested 
STRING>>");
-        createTestValuesSinkTable("MySink", "name STRING", "nested STRING");
-        String query =
-                "INSERT INTO MySink SELECT name, nested FROM MyNestedTable 
CROSS JOIN UNNEST(arr) AS t (nested)";
-        compileSqlAndExecutePlan(query).await();
-        List<String> expected = Arrays.asList("+I[Bob, 1]", "+I[Bob, 2]", 
"+I[Bob, 3]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out
deleted file mode 100644
index 6f30a30e341..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out
+++ /dev/null
@@ -1,152 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-correlate_1",
-    "joinType" : "INNER",
-    "functionCall" : {
-      "kind" : "CALL",
-      "systemName" : "func1",
-      "operands" : [ {
-        "kind" : "FIELD_ACCESS",
-        "name" : "c",
-        "expr" : {
-          "kind" : "CORREL_VARIABLE",
-          "correl" : "$cor0",
-          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL"
-        }
-      } ],
-      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
-    },
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>",
-    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[c, EXPR$0 AS s])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "b",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[c, s])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out
deleted file mode 100644
index dde24802d19..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out
+++ /dev/null
@@ -1,156 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-correlate_1",
-    "joinType" : "INNER",
-    "functionCall" : {
-      "kind" : "CALL",
-      "systemName" : "func1",
-      "operands" : [ {
-        "kind" : "FIELD_ACCESS",
-        "name" : "c",
-        "expr" : {
-          "kind" : "CORREL_VARIABLE",
-          "correl" : "$cor0",
-          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL"
-        }
-      }, {
-        "kind" : "LITERAL",
-        "value" : "$",
-        "type" : "CHAR(1) NOT NULL"
-      } ],
-      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
-    },
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>",
-    "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')], 
correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,d,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[c, EXPR$0 AS s])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "b",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[c, s])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
deleted file mode 100644
index bc89928a365..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
+++ /dev/null
@@ -1,166 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-correlate_1",
-    "joinType" : "INNER",
-    "functionCall" : {
-      "kind" : "CALL",
-      "systemName" : "func1",
-      "operands" : [ {
-        "kind" : "FIELD_ACCESS",
-        "name" : "c",
-        "expr" : {
-          "kind" : "CORREL_VARIABLE",
-          "correl" : "$cor0",
-          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL"
-        }
-      } ],
-      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
-    },
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>",
-    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, 
VARCHAR(2147483647) EXPR$0)], joinType=[INNER])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$=$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 2,
-        "type" : "VARCHAR(2147483647)"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "VARCHAR(2147483647)"
-      } ],
-      "type" : "BOOLEAN"
-    },
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `EXPR$0` 
VARCHAR(2147483647)>",
-    "description" : "Calc(select=[c, EXPR$0], where=[(c = EXPR$0)])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "b",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `EXPR$0` 
VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[c, EXPR$0])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out
deleted file mode 100644
index d0645182747..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out
+++ /dev/null
@@ -1,152 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "BIGINT"
-            }, {
-              "name" : "b",
-              "dataType" : "INT NOT NULL"
-            }, {
-              "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, d])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-correlate_1",
-    "joinType" : "LEFT",
-    "functionCall" : {
-      "kind" : "CALL",
-      "systemName" : "func1",
-      "operands" : [ {
-        "kind" : "FIELD_ACCESS",
-        "name" : "c",
-        "expr" : {
-          "kind" : "CORREL_VARIABLE",
-          "correl" : "$cor0",
-          "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL"
-        }
-      } ],
-      "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL"
-    },
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), 
`d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>",
-    "description" : "Correlate(invocation=[func1($cor0.c)], 
correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], 
rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, 
VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "VARCHAR(2147483647)"
-    }, {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 4,
-      "type" : "VARCHAR(2147483647)"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Calc(select=[c, EXPR$0 AS s])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "b",
-              "dataType" : "VARCHAR(2147483647)"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[c, s])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 4,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}


Reply via email to