This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dafed7ed282ba0807cf67e8ad2aa14cf68028ee6 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Mon Aug 12 01:14:41 2024 +0200 [hotfix] Make use of `CompiledPlan#asJsonString` instead of `getFormattedJson` --- .../apache/flink/table/api/CompiledPlanITCase.java | 7 +- .../table/api/internal/StatementSetImplTest.java | 17 +- .../test/resources/jsonplan/testGetJsonPlan.out | 179 ++++++++++----------- .../jsonplan/testGetJsonPlanWithHints.out | 10 +- .../flink/table/planner/utils/TableTestBase.scala | 8 +- 5 files changed, 96 insertions(+), 125 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index 76f5472d521..b0ccbd732ea 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -82,11 +82,8 @@ class CompiledPlanITCase extends JsonPlanTestBase { CompiledPlan compiledPlan = tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable"); String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out"); - assertThat( - getPreparedToCompareCompiledPlan( - TableTestUtil.getFormattedJson(compiledPlan.asJsonString()))) - .isEqualTo( - getPreparedToCompareCompiledPlan(TableTestUtil.getFormattedJson(expected))); + assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString())) + .isEqualTo(getPreparedToCompareCompiledPlan(expected)); } @Test diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java index de0940464cb..1425f11eea9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/StatementSetImplTest.java @@ -26,8 +26,6 @@ import org.apache.flink.table.planner.utils.TableTestUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.IOException; - import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link StatementSetImpl}. */ @@ -43,7 +41,7 @@ class StatementSetImplTest { } @Test - void testGetJsonPlan() throws IOException { + void testGetJsonPlan() { String srcTableDdl = "CREATE TABLE MyTable (\n" + " a bigint,\n" @@ -67,15 +65,8 @@ class StatementSetImplTest { StatementSet stmtSet = tableEnv.createStatementSet(); stmtSet.addInsertSql("INSERT INTO MySink SELECT * FROM MyTable"); String jsonPlan = stmtSet.compilePlan().asJsonString(); - String actual = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out"); - assertThat( - TableTestUtil.getFormattedJson( - TableTestUtil.replaceExecNodeId( - TableTestUtil.getFormattedJson(actual)))) - .isEqualTo( - TableTestUtil.getFormattedJson( - TableTestUtil.replaceExecNodeId( - TableTestUtil.replaceFlinkVersion( - TableTestUtil.getFormattedJson(jsonPlan))))); + String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out"); + assertThat(TableTestUtil.replaceFlinkVersion(TableTestUtil.replaceExecNodeId(jsonPlan))) + .isEqualTo(TableTestUtil.replaceExecNodeId(expected)); } } diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out index 57549b10481..ec46db0d0e4 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out @@ -1,105 +1,88 @@ { - "flinkVersion": "", - "nodes": [ - { - "id": 1, - "type": "stream-exec-table-source-scan_1", - "scanTableSource": { - "table": { - "identifier": "`default_catalog`.`default_database`.`MyTable`", - "resolvedTable": { - "schema": { - "columns": [ - { - "name": "a", - "dataType": "BIGINT" - }, - { - "name": "b", - "dataType": "INT" - }, - { - "name": "c", - "dataType": "VARCHAR(2147483647)" - } - ], - "watermarkSpecs": [] - }, - "partitionKeys": [], - "options": { - "bounded": "false", - "connector": "values" - } + "flinkVersion" : "", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "bounded" : "false", + "connector" : "values" } } - }, - "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description": "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", - "inputProperties": [] + } }, - { - "id": 2, - "type": "stream-exec-sink_1", - "configuration":{ - "table.exec.sink.keyed-shuffle":"AUTO", - "table.exec.sink.not-null-enforcer":"ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer":"IGNORE", - "table.exec.sink.upsert-materialize":"AUTO" - }, - "dynamicTableSink": { - "table": { - "identifier": "`default_catalog`.`default_database`.`MySink`", - "resolvedTable": { - "schema": { - "columns": [ - { - "name": "a", - "dataType": "BIGINT" - }, - { - "name": "b", - "dataType": "INT" - }, - { - "name": "c", - "dataType": "VARCHAR(2147483647)" - } - ], - "watermarkSpecs": [] - }, - "partitionKeys": [], - "options": { - "connector": "values", - "table-sink-class": "DEFAULT" - } - } - } - }, - "inputChangelogMode": [ - "INSERT" - ], - "inputProperties": [ - { - "requiredDistribution": { - "type": "UNKNOWN" + "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] }, - "damBehavior": "PIPELINED", - "priority": 0 + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "table-sink-class" : "DEFAULT" + } } - ], - "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", - "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" - } - ], - "edges": [ - { - "source": 1, - "target": 2, - "shuffle": { - "type": "FORWARD" + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" }, - "shuffleMode": "PIPELINED" - } - ] -} + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out index fd7c70323e1..9b5893b048e 100644 --- a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out @@ -1,7 +1,7 @@ { - "flinkVersion": "", + "flinkVersion" : "", "nodes" : [ { - "id": 0, + "id" : 0, "type" : "stream-exec-table-source-scan_1", "scanTableSource" : { "table" : { @@ -33,7 +33,7 @@ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.parallelism=2, bounded=true}]]])", "inputProperties" : [ ] }, { - "id": 0, + "id" : 0, "type" : "stream-exec-sink_1", "configuration" : { "table.exec.sink.keyed-shuffle" : "AUTO", @@ -79,8 +79,8 @@ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" } ], "edges" : [ { - "source": 0, - "target": 0, + "source" : 0, + "target" : 0, "shuffle" : { "type" : "FORWARD" }, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 4b19ef16150..a04695d072e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1816,14 +1816,14 @@ object TableTestUtil { /** ExecNode {id} is ignored, because id keeps incrementing in test class. */ def replaceExecNodeId(s: String): String = { - s.replaceAll("\"id\"\\s*:\\s*\\d+", "\"id\": 0") - .replaceAll("\"source\"\\s*:\\s*\\d+", "\"source\": 0") - .replaceAll("\"target\"\\s*:\\s*\\d+", "\"target\": 0") + s.replaceAll("\"id\"\\s*:\\s*\\d+", "\"id\" : 0") + .replaceAll("\"source\"\\s*:\\s*\\d+", "\"source\" : 0") + .replaceAll("\"target\"\\s*:\\s*\\d+", "\"target\" : 0") } /** Ignore flink version value. */ def replaceFlinkVersion(s: String): String = { - s.replaceAll("\"flinkVersion\"\\s*:\\s*\"[\\w.-]*\"", "\"flinkVersion\": \"\"") + s.replaceAll("\"flinkVersion\"\\s*:\\s*\"[\\w.-]*\"", "\"flinkVersion\" : \"\"") } /** Ignore exec node in operator name and description. */
