This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 263f3283724a5081e41f679659fa6a5819350739 Author: bvarghese1 <[email protected]> AuthorDate: Mon Jan 8 09:57:49 2024 -0800 [FLINK-33896] Remove Correlate Json Plan & Json IT tests - These are covered by the restore tests --- .../nodes/exec/stream/CorrelateJsonPlanTest.java | 139 ----------------- .../stream/jsonplan/CorrelateJsonPlanITCase.java | 119 --------------- .../testCrossJoin.out | 152 ------------------- .../testCrossJoinOverrideParameters.out | 156 ------------------- .../testJoinWithFilter.out | 166 --------------------- .../testLeftOuterJoinWithLiteralTrue.out | 152 ------------------- 6 files changed, 884 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java deleted file mode 100644 index d7672a03e03..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableFunc1; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for correlate. */ -class CorrelateJsonPlanTest extends TableTestBase { - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - String srcTableDdl = - "CREATE TABLE MyTable (\n" - + " a bigint,\n" - + " b int not null,\n" - + " c varchar,\n" - + " d timestamp(3)\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableDdl); - } - - @Test - void testCrossJoin() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a varchar,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - util.addTemporarySystemFunction("func1", TableFunc1.class); - String sqlQuery = - "insert into MySink SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"; - util.verifyJsonPlan(sqlQuery); - } - - @Test - @Disabled("the case is ignored because of FLINK-21870") - void testRegisterByClass() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a varchar,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - tEnv.createTemporaryFunction("func1", TableFunc1.class); - String sqlQuery = - "insert into MySink SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"; - util.verifyJsonPlan(sqlQuery); - } - - @Test - void testCrossJoinOverrideParameters() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a varchar,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - util.addTemporarySystemFunction("func1", TableFunc1.class); - String sqlQuery = - "insert into MySink SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"; - util.verifyJsonPlan(sqlQuery); - } - - @Test - void testLeftOuterJoinWithLiteralTrue() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a varchar,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - util.addTemporarySystemFunction("func1", TableFunc1.class); - String sqlQuery = - "insert into MySink SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"; - util.verifyJsonPlan(sqlQuery); - } - - @Test - void testJoinWithFilter() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a varchar,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - - util.addTemporarySystemFunction("func1", TableFunc1.class); - String sqlQuery = - "insert into MySink " - + "select * from (SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)) as T2 where c = s"; - util.verifyJsonPlan(sqlQuery); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java deleted file mode 100644 index a6e4fe6198a..00000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/CorrelateJsonPlanITCase.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.types.Row; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; - -/** Integration tests for correlate. */ -class CorrelateJsonPlanITCase extends JsonPlanTestBase { - - @BeforeEach - void before() { - List<Row> data = Collections.singletonList(Row.of("1,1,hi")); - createTestValuesSourceTable("MyTable", data, "a varchar"); - } - - @Test - void testSystemFuncByObject() throws ExecutionException, InterruptedException { - tableEnv.createTemporarySystemFunction( - "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); - createTestValuesSinkTable("MySink", "a STRING", "b STRING"); - String query = - "insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testSystemFuncByClass() throws ExecutionException, InterruptedException { - tableEnv.createTemporarySystemFunction( - "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class); - createTestValuesSinkTable("MySink", "a STRING", "b STRING"); - String query = - "insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testTemporaryFuncByObject() throws ExecutionException, InterruptedException { - tableEnv.createTemporaryFunction( - "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); - createTestValuesSinkTable("MySink", "a STRING", "b STRING"); - String query = - "insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testTemporaryFuncByClass() throws ExecutionException, InterruptedException { - tableEnv.createTemporaryFunction( - "STRING_SPLIT", JavaUserDefinedTableFunctions.StringSplit.class); - createTestValuesSinkTable("MySink", "a STRING", "b STRING"); - String query = - "insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testFilter() throws ExecutionException, InterruptedException { - tableEnv.createTemporarySystemFunction( - "STRING_SPLIT", new JavaUserDefinedTableFunctions.StringSplit()); - createTestValuesSinkTable("MySink", "a STRING", "b STRING"); - String query = - "insert into MySink " - + "SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v) " - + "where try_cast(v as int) > 0"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testUnnest() throws ExecutionException, InterruptedException { - List<Row> data = - Collections.singletonList( - Row.of("Bob", new Row[] {Row.of("1"), Row.of("2"), Row.of("3")})); - createTestValuesSourceTable( - "MyNestedTable", data, "name STRING", "arr ARRAY<ROW<nested STRING>>"); - createTestValuesSinkTable("MySink", "name STRING", "nested STRING"); - String query = - "INSERT INTO MySink SELECT name, nested FROM MyNestedTable CROSS JOIN UNNEST(arr) AS t (nested)"; - compileSqlAndExecutePlan(query).await(); - List<String> expected = Arrays.asList("+I[Bob, 1]", "+I[Bob, 2]", "+I[Bob, 3]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out deleted file mode 100644 index 6f30a30e341..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoin.out +++ /dev/null @@ -1,152 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-correlate_1", - "joinType" : "INNER", - "functionCall" : { - "kind" : "CALL", - "systemName" : "func1", - "operands" : [ { - "kind" : "FIELD_ACCESS", - "name" : "c", - "expr" : { - "kind" : "CORREL_VARIABLE", - "correl" : "$cor0", - "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL" - } - } ], - "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" - }, - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Calc(select=[c, EXPR$0 AS s])" - }, { - "id" : 4, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[c, s])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out deleted file mode 100644 index dde24802d19..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testCrossJoinOverrideParameters.out +++ /dev/null @@ -1,156 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-correlate_1", - "joinType" : "INNER", - "functionCall" : { - "kind" : "CALL", - "systemName" : "func1", - "operands" : [ { - "kind" : "FIELD_ACCESS", - "name" : "c", - "expr" : { - "kind" : "CORREL_VARIABLE", - "correl" : "$cor0", - "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL" - } - }, { - "kind" : "LITERAL", - "value" : "$", - "type" : "CHAR(1) NOT NULL" - } ], - "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" - }, - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Correlate(invocation=[func1($cor0.c, _UTF-16LE'$')], correlate=[table(func1($cor0.c,'$'))], select=[a,b,c,d,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Calc(select=[c, EXPR$0 AS s])" - }, { - "id" : 4, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[c, s])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out deleted file mode 100644 index bc89928a365..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out +++ /dev/null @@ -1,166 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-correlate_1", - "joinType" : "INNER", - "functionCall" : { - "kind" : "CALL", - "systemName" : "func1", - "operands" : [ { - "kind" : "FIELD_ACCESS", - "name" : "c", - "expr" : { - "kind" : "CORREL_VARIABLE", - "correl" : "$cor0", - "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL" - } - } ], - "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" - }, - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, VARCHAR(2147483647) EXPR$0)], joinType=[INNER])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : { - "kind" : "CALL", - "syntax" : "BINARY", - "internalName" : "$=$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "VARCHAR(2147483647)" - } ], - "type" : "BOOLEAN" - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Calc(select=[c, EXPR$0], where=[(c = EXPR$0)])" - }, { - "id" : 4, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[c, EXPR$0])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out deleted file mode 100644 index d0645182747..00000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/CorrelateJsonPlanTest_jsonplan/testLeftOuterJoinWithLiteralTrue.out +++ /dev/null @@ -1,152 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "BIGINT" - }, { - "name" : "b", - "dataType" : "INT NOT NULL" - }, { - "name" : "c", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "d", - "dataType" : "TIMESTAMP(3)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3)>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-correlate_1", - "joinType" : "LEFT", - "functionCall" : { - "kind" : "CALL", - "systemName" : "func1", - "operands" : [ { - "kind" : "FIELD_ACCESS", - "name" : "c", - "expr" : { - "kind" : "CORREL_VARIABLE", - "correl" : "$cor0", - "type" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `s` VARCHAR(2147483647)> NOT NULL" - } - } ], - "type" : "ROW<`EXPR$0` VARCHAR(2147483647)> NOT NULL" - }, - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL, `c` VARCHAR(2147483647), `d` TIMESTAMP(3), `EXPR$0` VARCHAR(2147483647)>", - "description" : "Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,d,EXPR$0], rowType=[RecordType(BIGINT a, INTEGER b, VARCHAR(2147483647) c, TIMESTAMP(3) d, VARCHAR(2147483647) EXPR$0)], joinType=[LEFT])" - }, { - "id" : 3, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Calc(select=[c, EXPR$0 AS s])" - }, { - "id" : 4, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`c` VARCHAR(2147483647), `s` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[c, s])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}
