This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac9d3436fa42ba36e1ea014b2a581e9ea11816ad Author: Sergey Nuyanzin <[email protected]> AuthorDate: Fri Aug 9 13:52:17 2024 +0200 [FLINK-36026][table] Options from `OPTIONS` hint should be present in compiled plan This closes #25186 --- .editorconfig | 3 + .../planner/plan/schema/CatalogSourceTable.java | 6 +- .../apache/flink/table/api/CompiledPlanITCase.java | 29 +++++-- .../jsonplan/testGetJsonPlanWithHints.out | 89 ++++++++++++++++++++++ 4 files changed, 117 insertions(+), 10 deletions(-) diff --git a/.editorconfig b/.editorconfig index 333c2a16580..5aa9a33ccd2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -263,6 +263,9 @@ ij_java_variable_annotation_wrap = normal ij_java_wrap_first_method_in_call_chain = true # ij_java_wrap_long_lines = false +[*.out] +insert_final_newline = false + [*.xml] indent_style = tab indent_size = 4 diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java index dd501487e4e..0c1cb09188f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.java @@ -107,19 +107,19 @@ public final class CatalogSourceTable extends FlinkPreparingTableBase { // finalize catalog table with option hints final Map<String, String> hintedOptions = FlinkHints.getHintedOptions(hints); - final ContextResolvedTable catalogTable = + final ContextResolvedTable contextTableWithHints = computeContextResolvedTable(context, hintedOptions); // create table source final DynamicTableSource tableSource = - createDynamicTableSource(context, catalogTable.getResolvedTable()); + createDynamicTableSource(context, contextTableWithHints.getResolvedTable()); // prepare table source and convert to RelNode return DynamicSourceUtils.convertSourceToRel( !schemaTable.isStreamingMode(), context.getTableConfig(), relBuilder, - schemaTable.getContextResolvedTable(), + contextTableWithHints, schemaTable.getStatistic(), hints, tableSource); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index 514a57b9c5d..76f5472d521 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -83,14 +83,25 @@ class CompiledPlanITCase extends JsonPlanTestBase { tableEnv.compilePlanSql("INSERT INTO MySink SELECT * FROM MyTable"); String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlan.out"); assertThat( - TableTestUtil.replaceExecNodeId( - TableTestUtil.replaceFlinkVersion( - TableTestUtil.getFormattedJson( - compiledPlan.asJsonString())))) + getPreparedToCompareCompiledPlan( + TableTestUtil.getFormattedJson(compiledPlan.asJsonString()))) .isEqualTo( - TableTestUtil.replaceExecNodeId( - TableTestUtil.replaceFlinkVersion( - TableTestUtil.getFormattedJson(expected)))); + getPreparedToCompareCompiledPlan(TableTestUtil.getFormattedJson(expected))); + } + + @Test + void testSourceTableWithHints() { + CompiledPlan compiledPlan = + tableEnv.compilePlanSql( + "INSERT INTO MySink SELECT * FROM MyTable" + // OPTIONS hints here do not play any significant role + // we just have to be sure that these options are present in + // compiled plan + + " /*+ OPTIONS('bounded'='true', 'scan.parallelism'='2') */"); + + String expected = TableTestUtil.readFromResource("/jsonplan/testGetJsonPlanWithHints.out"); + assertThat(getPreparedToCompareCompiledPlan(compiledPlan.asJsonString())) + .isEqualTo(expected); } @Test @@ -417,4 +428,8 @@ class CompiledPlanITCase extends JsonPlanTestBase { createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION); return createTestCsvSinkTable("sink", COLUMNS_DEFINITION); } + + private String getPreparedToCompareCompiledPlan(final String planAsString) { + return TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString)); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out new file mode 100644 index 00000000000..fd7c70323e1 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlanWithHints.out @@ -0,0 +1,89 @@ +{ + "flinkVersion": "", + "nodes" : [ { + "id": 0, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MyTable`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "bounded" : "true", + "connector" : "values", + "scan.parallelism" : "2" + } + } + } + }, + "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS options:{scan.parallelism=2, bounded=true}]]])", + "inputProperties" : [ ] + }, { + "id": 0, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`MySink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ], + "options" : { + "connector" : "values", + "table-sink-class" : "DEFAULT" + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])" + } ], + "edges" : [ { + "source": 0, + "target": 0, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file
