This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f8e29badfe6531154373db2104707f0296e60880 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Sun Nov 9 10:38:16 2025 +0100 [FLINK-38674][table] Support schema definition for Materialized tables while `CREATE` operation This closes #27222. --- .../dev/table/materialized-table/statements.md | 48 +++- .../dev/table/materialized-table/statements.md | 46 +++- .../src/main/codegen/includes/parserImpls.ftl | 20 +- .../sql/parser/ddl/SqlCreateMaterializedTable.java | 69 +++-- .../MaterializedTableStatementParserTest.java | 143 ++++++++++- .../AbstractCreateMaterializedTableConverter.java | 221 ++++++++++++++++ .../SqlCreateMaterializedTableConverter.java | 267 ++++++-------------- .../table/AbstractCreateTableConverter.java | 2 +- .../converters/table/MergeTableAsUtil.java | 2 +- .../planner/utils/MaterializedTableUtils.java | 28 ++- .../operations/SqlCTASNodeToOperationTest.java | 26 +- .../operations/SqlDdlToOperationConverterTest.java | 8 +- .../operations/SqlDmlToOperationConverterTest.java | 50 ++-- ...erializedTableNodeToOperationConverterTest.java | 277 +++++++++++++++++---- .../operations/SqlModelOperationConverterTest.java | 10 +- .../operations/SqlNodeToCallOperationTest.java | 4 +- .../SqlNodeToOperationConversionTestBase.java | 6 +- .../SqlRTASNodeToOperationConverterTest.java | 30 +-- .../SqlShowToOperationConverterTest.java | 6 +- 19 files changed, 895 insertions(+), 368 deletions(-) diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md b/docs/content.zh/docs/dev/table/materialized-table/statements.md index a27bce3093f..d65491c9084 100644 --- a/docs/content.zh/docs/dev/table/materialized-table/statements.md +++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md @@ -36,7 +36,11 @@ Flink SQL 目前支持以下物化表操作: ``` CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name -[ ([ <table_constraint> ]) ] +[( + { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] + [ <watermark_definition> ] + [ <table_constraint> ][ , ...n] +)] [COMMENT table_comment] @@ -44,14 +48,29 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name [WITH (key1=val1, key2=val2, ...)] -FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } +[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }] [REFRESH_MODE = { CONTINUOUS | FULL }] AS <select_statement> +<physical_column_definition>: + column_name column_type [ <column_constraint> ] [COMMENT column_comment] + +<column_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED + <table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED + +<metadata_column_definition>: + column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] + +<computed_column_definition>: + column_name AS computed_column_expression [COMMENT column_comment] + +<watermark_definition>: + WATERMARK FOR rowtime_column_name AS watermark_strategy_expression ``` ## PRIMARY KEY @@ -270,9 +289,32 @@ CREATE MATERIALIZED TABLE my_materialized_table_full GROUP BY p.ds, p.product_id, p.product_name ``` +And same materialized table with explicitly specified columns + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + ds, product_id, product_name, avg_sale_price, total_quantity) + ... +``` +The order of the columns doesn't need to be the same as in the query, +Flink will do reordering if required i.e. this will be also valid +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + product_id, product_name, ds, avg_sale_price, total_quantity) + ... +``` +Another way of doing this is putting name and data type +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + ds STRING, product_id STRING, product_name STRING, avg_sale_price DOUBLE, total_quantity BIGINT) + ... +``` +It might happen that types of columns are not the same, in that case implicit casts will be applied. +If for some of the combinations implicit cast is not supported then there will be validation error thrown. +Also, it is worth to note that reordering can also be done here. ## 限制 -- 不支持显式指定列名 +- Does not support explicitly specifying physical columns which are not used in the query - 不支持在 select 查询语句中引用临时表、临时视图或临时函数 # ALTER MATERIALIZED TABLE diff --git a/docs/content/docs/dev/table/materialized-table/statements.md b/docs/content/docs/dev/table/materialized-table/statements.md index 07784bb3ede..6e9c0b153b8 100644 --- a/docs/content/docs/dev/table/materialized-table/statements.md +++ b/docs/content/docs/dev/table/materialized-table/statements.md @@ -36,7 +36,11 @@ Flink SQL supports the following Materialized Table statements for now: ``` CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name -[ ([ <table_constraint> ]) ] +[( + { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] + [ <watermark_definition> ] + [ <table_constraint> ][ , ...n] +)] [COMMENT table_comment] @@ -50,8 +54,23 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS <select_statement> +<physical_column_definition>: + column_name column_type [ <column_constraint> ] [COMMENT column_comment] + +<column_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED + <table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED + +<metadata_column_definition>: + column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] + +<computed_column_definition>: + column_name AS computed_column_expression [COMMENT column_comment] + +<watermark_definition>: + WATERMARK FOR rowtime_column_name AS watermark_strategy_expression ``` ## PRIMARY KEY @@ -268,10 +287,33 @@ CREATE MATERIALIZED TABLE my_materialized_table_full GROUP BY p.ds, p.product_id, p.product_name ``` +And same materialized table with explicitly specified columns + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + ds, product_id, product_name, avg_sale_price, total_quantity) + ... +``` +The order of the columns doesn't need to be the same as in the query, +Flink will do reordering if required i.e. this will be also valid +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + product_id, product_name, ds, avg_sale_price, total_quantity) + ... +``` +Another way of doing this is putting name and data type +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full ( + ds STRING, product_id STRING, product_name STRING, avg_sale_price DOUBLE, total_quantity BIGINT) + ... +``` +It might happen that types of columns are not the same, in that case implicit casts will be applied. +If for some of the combinations implicit cast is not supported then there will be validation error thrown. +Also, it is worth to note that reordering can also be done here. ## Limitations -- Does not support explicitly specifying columns +- Does not support explicitly specifying physical columns which are not used in the query - Does not support referring to temporary tables, temporary views, or temporary functions in the select query # ALTER MATERIALIZED TABLE diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 764d0685b97..f7818dc81d8 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1859,13 +1859,17 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar final SqlParserPos startPos = s.pos(); SqlIdentifier tableName; SqlCharStringLiteral comment = null; - SqlTableConstraint constraint = null; + List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>(); + SqlWatermark watermark = null; + SqlNodeList columnList = SqlNodeList.EMPTY; SqlDistribution distribution = null; SqlNodeList partitionColumns = SqlNodeList.EMPTY; SqlNodeList propertyList = SqlNodeList.EMPTY; SqlNode freshness = null; SqlLiteral refreshMode = null; SqlNode asQuery = null; + SqlParserPos pos = startPos; + boolean isColumnsIdentifiersOnly = false; } { <MATERIALIZED> @@ -1884,8 +1888,14 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar <TABLE> tableName = CompoundIdentifier() [ - <LPAREN> - constraint = TableConstraint() + <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + TableColumnsOrIdentifiers(pos, ctx) { + pos = pos.plus(getPos()); + isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly(); + columnList = new SqlNodeList(ctx.columnList, pos); + constraints = ctx.constraints; + watermark = ctx.watermark; + } <RPAREN> ] [ @@ -1939,7 +1949,9 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar return new SqlCreateMaterializedTable( startPos.plus(getPos()), tableName, - constraint, + columnList, + constraints, + watermark, comment, distribution, partitionColumns, diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 8c7fe2d5089..687226a6220 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -18,8 +18,11 @@ package org.apache.flink.sql.parser.ddl; +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlConstraintValidator; import org.apache.flink.sql.parser.SqlUnparseUtils; import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlCreate; @@ -37,21 +40,22 @@ import org.apache.calcite.util.ImmutableNullableList; import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Optional; import static java.util.Objects.requireNonNull; /** CREATE MATERIALIZED TABLE DDL sql call. */ -public class SqlCreateMaterializedTable extends SqlCreate { +public class SqlCreateMaterializedTable extends SqlCreate implements ExtendedSqlNode { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE MATERIALIZED TABLE", SqlKind.CREATE_TABLE); private final SqlIdentifier tableName; - private final @Nullable SqlTableConstraint tableConstraint; + private final SqlNodeList columnList; + + private final List<SqlTableConstraint> tableConstraints; private final @Nullable SqlCharStringLiteral comment; @@ -59,6 +63,8 @@ public class SqlCreateMaterializedTable extends SqlCreate { private final SqlNodeList partitionKeyList; + private final SqlWatermark watermark; + private final SqlNodeList propertyList; private final @Nullable SqlIntervalLiteral freshness; @@ -70,7 +76,9 @@ public class SqlCreateMaterializedTable extends SqlCreate { public SqlCreateMaterializedTable( SqlParserPos pos, SqlIdentifier tableName, - @Nullable SqlTableConstraint tableConstraint, + SqlNodeList columnList, + List<SqlTableConstraint> tableConstraints, + SqlWatermark watermark, @Nullable SqlCharStringLiteral comment, @Nullable SqlDistribution distribution, SqlNodeList partitionKeyList, @@ -80,7 +88,9 @@ public class SqlCreateMaterializedTable extends SqlCreate { SqlNode asQuery) { super(OPERATOR, pos, false, false); this.tableName = requireNonNull(tableName, "tableName should not be null"); - this.tableConstraint = tableConstraint; + this.columnList = columnList; + this.tableConstraints = tableConstraints; + this.watermark = watermark; this.comment = comment; this.distribution = distribution; this.partitionKeyList = @@ -100,8 +110,10 @@ public class SqlCreateMaterializedTable extends SqlCreate { public List<SqlNode> getOperandList() { return ImmutableNullableList.of( tableName, + columnList, + new SqlNodeList(tableConstraints, SqlParserPos.ZERO), + watermark, comment, - tableConstraint, partitionKeyList, propertyList, freshness, @@ -116,12 +128,20 @@ public class SqlCreateMaterializedTable extends SqlCreate { return tableName.names.toArray(new String[0]); } - public Optional<SqlCharStringLiteral> getComment() { - return Optional.ofNullable(comment); + public SqlNodeList getColumnList() { + return columnList; + } + + public List<SqlTableConstraint> getTableConstraints() { + return tableConstraints; + } + + public Optional<SqlWatermark> getWatermark() { + return Optional.ofNullable(watermark); } - public Optional<SqlTableConstraint> getTableConstraint() { - return Optional.ofNullable(tableConstraint); + public Optional<SqlCharStringLiteral> getComment() { + return Optional.ofNullable(comment); } public @Nullable SqlDistribution getDistribution() { @@ -150,20 +170,33 @@ public class SqlCreateMaterializedTable extends SqlCreate { return asQuery; } + /** Returns the column constraints plus the table constraints. */ + public List<SqlTableConstraint> getFullConstraints() { + return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList); + } + + @Override + public void validate() throws SqlValidateException { + if (!isSchemaWithColumnsIdentifiersOnly()) { + SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList); + } + } + + public boolean isSchemaWithColumnsIdentifiersOnly() { + // CREATE MATERIALIZED TABLE supports passing only column identifiers in the column list. If + // the first column in the list is an identifier, then we assume the rest of the + // columns are identifiers as well. + return !getColumnList().isEmpty() && getColumnList().get(0) instanceof SqlIdentifier; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("CREATE MATERIALIZED TABLE"); tableName.unparse(writer, leftPrec, rightPrec); - if (tableConstraint != null) { - writer.newlineAndIndent(); + if (!columnList.isEmpty() || !tableConstraints.isEmpty() || watermark != null) { SqlUnparseUtils.unparseTableSchema( - writer, - leftPrec, - rightPrec, - SqlNodeList.EMPTY, - Collections.singletonList(tableConstraint), - null); + writer, leftPrec, rightPrec, columnList, tableConstraints, watermark); } if (comment != null) { diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java index d089ecc8079..29f5a52f049 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java @@ -39,13 +39,23 @@ class MaterializedTableStatementParserTest { @ParameterizedTest(name = "{index}: {0}") @MethodSource("inputForCreateMaterializedTable") - void testCreateMaterializedTable(Map.Entry<String, String> sqlToExpected) { + void testCreateMaterializedTable( + final String testName, final Map.Entry<String, String> sqlToExpected) { final String sql = sqlToExpected.getKey(); final String expected = sqlToExpected.getValue(); sql(sql).ok(expected); } + @Test + void testCreateMaterializedTableWithWrongSchema() { + final String sql = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(a, b ^STRING^)\n" + + "AS SELECT a, b, h, t m FROM source"; + sql(sql).fails("(?s).*Encountered \"STRING\" at line 2, column 7.*"); + } + @Test void testCreateMaterializedTableWithUnsupportedFreshnessInterval() { final String sql = @@ -373,17 +383,23 @@ class MaterializedTableStatementParserTest { private static Stream<Arguments> inputForCreateMaterializedTable() { return Stream.of( - Arguments.of(fullExample()), - Arguments.of(withoutTableConstraint()), - Arguments.of(withPrimaryKey()), - Arguments.of(withoutFreshness())); + Arguments.of("Full example", fullExample()), + Arguments.of("With columns", withColumns()), + Arguments.of("With columns and watermarks", withColumnsAndWatermark()), + Arguments.of("Without table constraint", withoutTableConstraint()), + Arguments.of("With primary key", withPrimaryKey()), + Arguments.of("Without freshness", withoutFreshness()), + Arguments.of("With column identifiers only", withColumnsIdentifiersOnly())); } private static Map.Entry<String, String> fullExample() { return new AbstractMap.SimpleEntry<>( "CREATE MATERIALIZED TABLE tbl1\n" + "(\n" - + " PRIMARY KEY (a, b)\n" + + " ts timestamp(3),\n" + + " id varchar,\n" + + " watermark FOR ts AS ts - interval '3' second,\n" + + " PRIMARY KEY (id)\n" + ")\n" + "COMMENT 'table comment'\n" + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n" @@ -394,9 +410,11 @@ class MaterializedTableStatementParserTest { + ")\n" + "FRESHNESS = INTERVAL '3' MINUTES\n" + "AS SELECT a, b, h, t m FROM source", - "CREATE MATERIALIZED TABLE `TBL1`\n" - + "(\n" - + " PRIMARY KEY (`A`, `B`)\n" + "CREATE MATERIALIZED TABLE `TBL1` (\n" + + " `TS` TIMESTAMP(3),\n" + + " `ID` VARCHAR,\n" + + " PRIMARY KEY (`ID`),\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" + ")\n" + "COMMENT 'table comment'\n" + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n" @@ -421,8 +439,7 @@ class MaterializedTableStatementParserTest { + "FRESHNESS = INTERVAL '3' MINUTE\n" + "REFRESH_MODE = FULL\n" + "AS SELECT a, b, h, t m FROM source", - "CREATE MATERIALIZED TABLE `TBL1`\n" - + "(\n" + "CREATE MATERIALIZED TABLE `TBL1` (\n" + " PRIMARY KEY (`A`, `B`)\n" + ")\n" + "COMMENT 'table comment'\n" @@ -460,8 +477,7 @@ class MaterializedTableStatementParserTest { + " 'kafka.topic' = 'log.test'\n" + ")\n" + "AS SELECT a, b, h, t m FROM source", - "CREATE MATERIALIZED TABLE `TBL1`\n" - + "(\n" + "CREATE MATERIALIZED TABLE `TBL1` (\n" + " PRIMARY KEY (`A`, `B`)\n" + ")\n" + "WITH (\n" @@ -472,4 +488,105 @@ class MaterializedTableStatementParserTest { + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + "FROM `SOURCE`"); } + + private static Map.Entry<String, String> withColumns() { + return new AbstractMap.SimpleEntry<>( + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " a INT, b STRING, h INT, m INT\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source", + "CREATE MATERIALIZED TABLE `TBL1` (\n" + + " `A` INTEGER,\n" + + " `B` STRING,\n" + + " `H` INTEGER,\n" + + " `M` INTEGER\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n" + + "PARTITIONED BY (`A`, `H`)\n" + + "WITH (\n" + + " 'group.id' = 'latest',\n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"); + } + + private static Map.Entry<String, String> withColumnsAndWatermark() { + return new AbstractMap.SimpleEntry<>( + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " ts timestamp(3),\n" + + " id varchar,\n" + + " watermark FOR ts AS ts - interval '3' second\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source", + "CREATE MATERIALIZED TABLE `TBL1` (\n" + + " `TS` TIMESTAMP(3),\n" + + " `ID` VARCHAR,\n" + + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n" + + "PARTITIONED BY (`A`, `H`)\n" + + "WITH (\n" + + " 'group.id' = 'latest',\n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"); + } + + private static Map.Entry<String, String> withColumnsIdentifiersOnly() { + return new AbstractMap.SimpleEntry<>( + "CREATE MATERIALIZED TABLE tbl1\n" + + "(a, b, h, m)\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source", + "CREATE MATERIALIZED TABLE `TBL1` (\n" + + " `A`,\n" + + " `B`,\n" + + " `H`,\n" + + " `M`\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n" + + "PARTITIONED BY (`A`, `H`)\n" + + "WITH (\n" + + " 'group.id' = 'latest',\n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java new file mode 100644 index 00000000000..fb61e48b7ef --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable; +import org.apache.flink.sql.parser.ddl.SqlRefreshMode; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; +import org.apache.flink.table.planner.utils.MaterializedTableUtils; +import org.apache.flink.table.planner.utils.OperationConverterUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import org.apache.calcite.sql.SqlNode; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER; +import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS; + +/** + * Abstract class for converting {@link SqlCreateMaterializedTable} and it's children to create + * materialized table operations. + */ +public abstract class AbstractCreateMaterializedTableConverter<T extends SqlCreateMaterializedTable> + implements SqlNodeConverter<T> { + /** Context of create table converters while merging source and derived items. */ + protected interface MergeContext { + Schema getMergedSchema(); + + Map<String, String> getMergedTableOptions(); + + List<String> getMergedPartitionKeys(); + + Optional<TableDistribution> getMergedTableDistribution(); + + String getMergedDefinitionQuery(); + + ResolvedSchema getMergedQuerySchema(); + } + + protected abstract MergeContext getMergeContext( + T sqlCreateMaterializedTable, ConvertContext context); + + protected final Optional<TableDistribution> getDerivedTableDistribution( + T sqlCreateMaterializedTable) { + return Optional.ofNullable(sqlCreateMaterializedTable.getDistribution()) + .map(OperationConverterUtils::getDistributionFromSqlDistribution); + } + + protected final List<String> getDerivedPartitionKeys(T sqlCreateMaterializedTable) { + return OperationConverterUtils.getColumnNames( + sqlCreateMaterializedTable.getPartitionKeyList()); + } + + protected final Map<String, String> getDerivedTableOptions(T sqlCreateMaterializedTable) { + return OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList()); + } + + protected final IntervalFreshness getDerivedFreshness(T sqlCreateMaterializedTable) { + return Optional.ofNullable(sqlCreateMaterializedTable.getFreshness()) + .map(MaterializedTableUtils::getMaterializedTableFreshness) + .orElse(null); + } + + protected final ResolvedSchema getQueryResolvedSchema( + T sqlCreateMaterializedTable, ConvertContext context) { + SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery(); + SqlNode validateQuery = context.getSqlValidator().validate(selectQuery); + + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validateQuery).project(), + () -> context.toQuotedSqlString(validateQuery)); + return queryOperation.getResolvedSchema(); + } + + protected final LogicalRefreshMode getDerivedLogicalRefreshMode(T sqlCreateMaterializedTable) { + SqlRefreshMode sqlRefreshMode = + Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode()) + .map(mode -> mode.getValueAs(SqlRefreshMode.class)) + .orElse(null); + + return MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode); + } + + protected final RefreshMode getDerivedRefreshMode(LogicalRefreshMode logicalRefreshMode) { + return MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode); + } + + protected final String getDerivedDefinitionQuery( + T sqlCreateMaterializedTable, ConvertContext context) { + SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery(); + SqlNode validatedQuery = context.getSqlValidator().validate(selectQuery); + return context.toQuotedSqlString(validatedQuery); + } + + protected final String getComment(T sqlCreateMaterializedTable) { + return OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment()); + } + + protected final ResolvedCatalogMaterializedTable getResolvedCatalogMaterializedTable( + T sqlCreateMaterializedTable, ConvertContext context) { + final MergeContext mergeContext = getMergeContext(sqlCreateMaterializedTable, context); + final List<String> partitionKeys = mergeContext.getMergedPartitionKeys(); + final Schema schema = mergeContext.getMergedSchema(); + final ResolvedSchema querySchema = mergeContext.getMergedQuerySchema(); + final Map<String, String> tableOptions = mergeContext.getMergedTableOptions(); + verifyPartitioningColumnsExist(querySchema, partitionKeys, tableOptions); + + final TableDistribution distribution = + mergeContext.getMergedTableDistribution().orElse(null); + final String comment = getComment(sqlCreateMaterializedTable); + final String definitionQuery = mergeContext.getMergedDefinitionQuery(); + final IntervalFreshness intervalFreshness = getDerivedFreshness(sqlCreateMaterializedTable); + final LogicalRefreshMode logicalRefreshMode = + getDerivedLogicalRefreshMode(sqlCreateMaterializedTable); + final RefreshMode refreshMode = getDerivedRefreshMode(logicalRefreshMode); + return context.getCatalogManager() + .resolveCatalogMaterializedTable( + CatalogMaterializedTable.newBuilder() + .schema(schema) + .comment(comment) + .distribution(distribution) + .partitionKeys(partitionKeys) + .options(tableOptions) + .definitionQuery(definitionQuery) + .freshness(intervalFreshness) + .logicalRefreshMode(logicalRefreshMode) + .refreshMode(refreshMode) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build()); + } + + protected final ObjectIdentifier getIdentifier( + SqlCreateMaterializedTable node, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private void verifyPartitioningColumnsExist( + ResolvedSchema schema, List<String> partitionKeys, Map<String, String> tableOptions) { + final Set<String> partitionFieldOptions = + tableOptions.keySet().stream() + .filter(k -> k.startsWith(PARTITION_FIELDS)) + .collect(Collectors.toSet()); + + for (String partitionKey : partitionKeys) { + if (schema.getColumn(partitionKey).isEmpty()) { + throw new ValidationException( + String.format( + "Partition column '%s' not defined in the query schema. Available columns: [%s].", + partitionKey, + schema.getColumnNames().stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + } + + // verify partition key used by materialized table partition option + // partition.fields.#.date-formatter whether exist + for (String partitionOption : partitionFieldOptions) { + String partitionKey = + partitionOption.substring( + PARTITION_FIELDS.length() + 1, + partitionOption.length() - (DATE_FORMATTER.length() + 1)); + // partition key used in option partition.fields.#.date-formatter must be existed + if (!partitionKeys.contains(partitionKey)) { + throw new ValidationException( + String.format( + "Column '%s' referenced by materialized table option '%s' isn't a partition column. Available partition columns: [%s].", + partitionKey, + partitionOption, + partitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + // partition key used in option partition.fields.#.date-formatter must be string type + LogicalType partitionKeyType = + schema.getColumn(partitionKey).get().getDataType().getLogicalType(); + if (!partitionKeyType + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + throw new ValidationException( + String.format( + "Materialized table option '%s' only supports referring to char, varchar and string type partition column. Column %s type is %s.", + partitionOption, partitionKey, partitionKeyType.asSummaryString())); + } + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java index 2107b209b27..3beb5596a90 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java @@ -18,234 +18,109 @@ package org.apache.flink.table.planner.operations.converters; -import org.apache.flink.sql.parser.SqlConstraintValidator; import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable; -import org.apache.flink.sql.parser.ddl.SqlRefreshMode; -import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; -import org.apache.flink.sql.parser.error.SqlValidateException; +import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogMaterializedTable; -import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; -import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.IntervalFreshness; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableDistribution; -import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; -import org.apache.flink.table.planner.operations.PlannerQueryOperation; -import org.apache.flink.table.planner.utils.MaterializedTableUtils; -import org.apache.flink.table.planner.utils.OperationConverterUtils; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.planner.operations.converters.table.MergeTableAsUtil; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; -import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER; -import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS; -import static org.apache.flink.table.catalog.IntervalFreshness.validateFreshnessForCron; - -/** A converter for {@link SqlCreateMaterializedTable}. */ +/** + * A converter for {@link SqlCreateMaterializedTable} to {@link CreateMaterializedTableOperation}. + */ public class SqlCreateMaterializedTableConverter - implements SqlNodeConverter<SqlCreateMaterializedTable> { + extends AbstractCreateMaterializedTableConverter<SqlCreateMaterializedTable> { @Override public Operation convertSqlNode( SqlCreateMaterializedTable sqlCreateMaterializedTable, ConvertContext context) { - UnresolvedIdentifier unresolvedIdentifier = - UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName()); - ObjectIdentifier identifier = - context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); - - // get comment - String tableComment = - OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment()); - - // get options - final Map<String, String> tableOptions = - OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList()); - - // get freshness - IntervalFreshness intervalFreshness = - Optional.ofNullable(sqlCreateMaterializedTable.getFreshness()) - .map(MaterializedTableUtils::getMaterializedTableFreshness) - .orElse(null); - - // Get the logical refresh mode from SQL - SqlRefreshMode sqlRefreshMode = - Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode()) - .map(mode -> mode.getValueAs(SqlRefreshMode.class)) - .orElse(null); - - final LogicalRefreshMode logicalRefreshMode = - MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode); - - // get the physical refresh mode from SQL - final RefreshMode refreshMode = - sqlRefreshMode == null - ? null - : MaterializedTableUtils.fromSqltoRefreshMode(sqlRefreshMode); - - if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode && intervalFreshness != null) { - validateFreshnessForCron(intervalFreshness); - } - - // get query schema and definition query - SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery(); - SqlNode validatedQuery = context.getSqlValidator().validate(selectQuery); - - String definitionQuery = context.toQuotedSqlString(validatedQuery); - - PlannerQueryOperation queryOperation = - new PlannerQueryOperation( - context.toRelRoot(validatedQuery).project(), () -> definitionQuery); - - // get schema - ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema(); - Schema.Builder builder = Schema.newBuilder().fromResolvedSchema(resolvedSchema); - - // get and verify partition key - List<String> partitionKeys = - OperationConverterUtils.getColumnNames( - sqlCreateMaterializedTable.getPartitionKeyList()); - verifyPartitioningColumnsExist( - resolvedSchema, - partitionKeys, - tableOptions.keySet().stream() - .filter(k -> k.startsWith(PARTITION_FIELDS)) - .collect(Collectors.toSet())); - - // verify and build primary key - sqlCreateMaterializedTable - .getTableConstraint() - .ifPresent( - sqlTableConstraint -> - verifyAndBuildPrimaryKey( - builder, resolvedSchema, sqlTableConstraint)); - - Optional<TableDistribution> tableDistribution = - Optional.ofNullable(sqlCreateMaterializedTable.getDistribution()) - .map(OperationConverterUtils::getDistributionFromSqlDistribution); - - CatalogMaterializedTable materializedTable = - CatalogMaterializedTable.newBuilder() - .schema(builder.build()) - .comment(tableComment) - .distribution(tableDistribution.orElse(null)) - .partitionKeys(partitionKeys) - .options(tableOptions) - .definitionQuery(definitionQuery) - .freshness(intervalFreshness) - .logicalRefreshMode(logicalRefreshMode) - .refreshMode(refreshMode) - .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) - .build(); - return new CreateMaterializedTableOperation( - identifier, - context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable)); + getIdentifier(sqlCreateMaterializedTable, context), + getResolvedCatalogMaterializedTable(sqlCreateMaterializedTable, context)); } - private static void verifyPartitioningColumnsExist( - ResolvedSchema resolvedSchema, - List<String> partitionKeys, - Set<String> partitionFieldOptions) { - // verify partition key whether exists - for (String partitionKey : partitionKeys) { - if (!resolvedSchema.getColumn(partitionKey).isPresent()) { - throw new ValidationException( - String.format( - "Partition column '%s' not defined in the query schema. Available columns: [%s].", - partitionKey, - resolvedSchema.getColumnNames().stream() - .collect(Collectors.joining("', '", "'", "'")))); + @Override + protected MergeContext getMergeContext( + SqlCreateMaterializedTable sqlCreateMaterializedTable, ConvertContext context) { + return new MergeContext() { + private final MergeTableAsUtil mergeTableAsUtil = new MergeTableAsUtil(context); + private final String definitionQuery = + SqlCreateMaterializedTableConverter.this.getDerivedDefinitionQuery( + sqlCreateMaterializedTable, context); + private final ResolvedSchema querySchema = + SqlCreateMaterializedTableConverter.this.getQueryResolvedSchema( + sqlCreateMaterializedTable, context); + + @Override + public Schema getMergedSchema() { + final Set<String> querySchemaColumnNames = + new HashSet<>(querySchema.getColumnNames()); + final SqlNodeList sqlNodeList = sqlCreateMaterializedTable.getColumnList(); + for (SqlNode column : sqlNodeList) { + if (!(column instanceof SqlRegularColumn)) { + continue; + } + + SqlRegularColumn physicalColumn = (SqlRegularColumn) column; + if (!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) { + throw new ValidationException( + String.format( + "Invalid as physical column '%s' is defined in the DDL, but is not used in a query column.", + physicalColumn.getName().getSimple())); + } + } + if (sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) { + // If only column identifiers are provided, then these are used to + // order the columns in the schema. + return mergeTableAsUtil.reorderSchema(sqlNodeList, querySchema); + } else { + return mergeTableAsUtil.mergeSchemas( + sqlNodeList, + sqlCreateMaterializedTable.getWatermark().orElse(null), + sqlCreateMaterializedTable.getFullConstraints(), + querySchema); + } } - } - // verify partition key used by materialized table partition option - // partition.fields.#.date-formatter whether exist - for (String partitionOption : partitionFieldOptions) { - String partitionKey = - partitionOption.substring( - PARTITION_FIELDS.length() + 1, - partitionOption.length() - (DATE_FORMATTER.length() + 1)); - // partition key used in option partition.fields.#.date-formatter must be existed - if (!partitionKeys.contains(partitionKey)) { - throw new ValidationException( - String.format( - "Column '%s' referenced by materialized table option '%s' isn't a partition column. Available partition columns: [%s].", - partitionKey, - partitionOption, - partitionKeys.stream() - .collect(Collectors.joining("', '", "'", "'")))); + @Override + public Map<String, String> getMergedTableOptions() { + return SqlCreateMaterializedTableConverter.this.getDerivedTableOptions( + sqlCreateMaterializedTable); } - // partition key used in option partition.fields.#.date-formatter must be string type - LogicalType partitionKeyType = - resolvedSchema.getColumn(partitionKey).get().getDataType().getLogicalType(); - if (!partitionKeyType - .getTypeRoot() - .getFamilies() - .contains(LogicalTypeFamily.CHARACTER_STRING)) { - throw new ValidationException( - String.format( - "Materialized table option '%s' only supports referring to char, varchar and string type partition column. Column %s type is %s.", - partitionOption, partitionKey, partitionKeyType.asSummaryString())); + @Override + public List<String> getMergedPartitionKeys() { + return SqlCreateMaterializedTableConverter.this.getDerivedPartitionKeys( + sqlCreateMaterializedTable); } - } - } - private static void verifyAndBuildPrimaryKey( - Schema.Builder schemaBuilder, - ResolvedSchema resolvedSchema, - SqlTableConstraint sqlTableConstraint) { - // check constraint type - try { - SqlConstraintValidator.validate(sqlTableConstraint); - } catch (SqlValidateException e) { - throw new ValidationException( - String.format("Primary key validation failed: %s.", e.getMessage()), e); - } - - List<String> primaryKeyColumns = Arrays.asList(sqlTableConstraint.getColumnNames()); - for (String columnName : primaryKeyColumns) { - Optional<Column> columnOptional = resolvedSchema.getColumn(columnName); - if (!columnOptional.isPresent()) { - throw new ValidationException( - String.format( - "Primary key column '%s' not defined in the query schema. Available columns: [%s].", - columnName, - resolvedSchema.getColumnNames().stream() - .collect(Collectors.joining("', '", "'", "'")))); + @Override + public Optional<TableDistribution> getMergedTableDistribution() { + return SqlCreateMaterializedTableConverter.this.getDerivedTableDistribution( + sqlCreateMaterializedTable); } - if (columnOptional.get().getDataType().getLogicalType().isNullable()) { - throw new ValidationException( - String.format( - "Could not create a PRIMARY KEY with nullable column '%s'.\n" - + "A PRIMARY KEY column must be declared on non-nullable physical columns.", - columnName)); + @Override + public String getMergedDefinitionQuery() { + return definitionQuery; } - } - // build primary key - String constraintName = - sqlTableConstraint - .getConstraintName() - .orElseGet( - () -> - primaryKeyColumns.stream() - .collect(Collectors.joining("_", "PK_", ""))); - schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns); + @Override + public ResolvedSchema getMergedQuerySchema() { + return querySchema; + } + }; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java index 3a3de75c9f2..c22c52b9457 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java @@ -45,7 +45,7 @@ import java.util.stream.Collectors; public abstract class AbstractCreateTableConverter<T extends SqlCreateTable> implements SqlNodeConverter<T> { - /** Context of create table converters while mering source and derived items. */ + /** Context of create table converters while merging source and derived items. */ protected interface MergeContext { Schema getMergedSchema(ResolvedSchema schemaToMerge); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java index a96c607693c..1af8044cc25 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java @@ -351,7 +351,7 @@ public class MergeTableAsUtil { String name = ((SqlIdentifier) identifier).getSimple(); if (!sourceSchemaCols.containsKey(name)) { throw new ValidationException( - String.format("Column '%s' not found in the source schema. ", name)); + String.format("Column '%s' not found in the source schema.", name)); } sinkSchemaCols.put(name, sourceSchemaCols.get(name)); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index ba765e40a69..2f32aac16f6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.sql.parser.ddl.SqlRefreshMode; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.IntervalFreshness; @@ -61,24 +61,23 @@ public class MaterializedTableUtils { } } - public static CatalogMaterializedTable.LogicalRefreshMode deriveLogicalRefreshMode( - SqlRefreshMode sqlRefreshMode) { + public static LogicalRefreshMode deriveLogicalRefreshMode(SqlRefreshMode sqlRefreshMode) { if (sqlRefreshMode == null) { - return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC; + return LogicalRefreshMode.AUTOMATIC; } switch (sqlRefreshMode) { case FULL: - return CatalogMaterializedTable.LogicalRefreshMode.FULL; + return LogicalRefreshMode.FULL; case CONTINUOUS: - return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS; + return LogicalRefreshMode.CONTINUOUS; default: throw new ValidationException( String.format("Unsupported logical refresh mode: %s.", sqlRefreshMode)); } } - public static RefreshMode fromSqltoRefreshMode(SqlRefreshMode sqlRefreshMode) { + public static RefreshMode fromSqlToRefreshMode(SqlRefreshMode sqlRefreshMode) { switch (sqlRefreshMode) { case FULL: return RefreshMode.FULL; @@ -88,4 +87,19 @@ public class MaterializedTableUtils { throw new IllegalArgumentException("Unknown refresh mode: " + sqlRefreshMode); } } + + public static RefreshMode fromLogicalRefreshModeToRefreshMode( + LogicalRefreshMode logicalRefreshMode) { + switch (logicalRefreshMode) { + case AUTOMATIC: + return null; + case FULL: + return RefreshMode.FULL; + case CONTINUOUS: + return RefreshMode.CONTINUOUS; + default: + throw new IllegalArgumentException( + "Unknown logical refresh mode: " + logicalRefreshMode); + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java index b4aa3ee7837..3add4e37f46 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java @@ -48,13 +48,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test base for testing convert CREATE TABLE AS statement to operation. */ -public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTestBase { +class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTestBase { private static final Map<String, String> TABLE_OPTIONS = Map.of("connector", TestSimpleDynamicTableSourceFactory.IDENTIFIER()); @Test - public void testCreateTableAsWithNotFoundColumnIdentifiers() { + void testCreateTableAsWithNotFoundColumnIdentifiers() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -76,7 +76,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithMismatchIdentifiersLength() { + void testCreateTableAsWithMismatchIdentifiersLength() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -100,7 +100,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithColumns() { + void testCreateTableAsWithColumns() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -142,7 +142,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithColumnsOverridden() { + void testCreateTableAsWithColumnsOverridden() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -181,7 +181,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { + void testCreateTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -207,7 +207,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithOverriddenComputedColumnsNotAllowed() { + void testCreateTableAsWithOverriddenComputedColumnsNotAllowed() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -231,7 +231,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithPrimaryAndPartitionKey() { + void testCreateTableAsWithPrimaryAndPartitionKey() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -266,7 +266,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithWatermark() { + void testCreateTableAsWithWatermark() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -301,7 +301,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithNotNullColumnsAreNotAllowed() { + void testCreateTableAsWithNotNullColumnsAreNotAllowed() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -323,7 +323,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testCreateTableAsWithIncompatibleImplicitCastTypes() { + void testCreateTableAsWithIncompatibleImplicitCastTypes() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -348,7 +348,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testMergingCreateTableAsWithDistribution() { + void testMergingCreateTableAsWithDistribution() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( @@ -388,7 +388,7 @@ public class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTest } @Test - public void testMergingCreateTableAsWitEmptyDistribution() { + void testMergingCreateTableAsWitEmptyDistribution() { CatalogTable catalogTable = CatalogTable.newBuilder() .schema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 6a465210929..1ddd3a15860 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -1406,7 +1406,7 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas checkAlterNonExistTable("alter table %s nonexistent drop watermark"); } - @ParameterizedTest(name = "[{index}] {0}") + @ParameterizedTest(name = "{index}: {0}") @MethodSource("provideCreateMaterializedTableTestCases") void createMaterializedTableWithVariousOptions( String testName, @@ -1429,7 +1429,7 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas return Stream.of( Arguments.of( "with refresh mode continuous", - "CREATE MATERIALIZED TABLE users_shops (" + "CREATE MATERIALIZED TABLE users_shops (shop_id int, user_id int," + " PRIMARY KEY (user_id) not enforced)" + " WITH(\n" + " 'format' = 'debezium-json'\n" @@ -1439,14 +1439,14 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas + " AS SELECT 1 as shop_id, 2 as user_id ", "CREATE MATERIALIZED TABLE: (materializedTable: " + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" - + " `shop_id` INT NOT NULL,\n" + + " `shop_id` INT,\n" + " `user_id` INT NOT NULL,\n" + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + "), comment='null', distribution=null, partitionKeys=[], " + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, " + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" - + " `shop_id` INT NOT NULL,\n" + + " `shop_id` INT,\n" + " `user_id` INT NOT NULL,\n" + " CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" + ")}], identifier: [`builtin`.`default`.`users_shops`])", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java index afe84800fd3..89d2ee48822 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java @@ -53,34 +53,34 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.InstanceOfAssertFactories.type; /** Test cases for the DML statements for {@link SqlNodeToOperationConversion}. */ -public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test - public void testExplainWithSelect() { + void testExplainWithSelect() { final String sql = "explain select * from t1"; checkExplainSql(sql); } @Test - public void testExplainWithInsert() { + void testExplainWithInsert() { final String sql = "explain insert into t2 select * from t1"; checkExplainSql(sql); } @Test - public void testExplainWithUnion() { + void testExplainWithUnion() { final String sql = "explain select * from t1 union select * from t2"; checkExplainSql(sql); } @Test - public void testExplainWithExplainDetails() { + void testExplainWithExplainDetails() { String sql = "explain changelog_mode, estimated_cost, json_execution_plan select * from t1"; checkExplainSql(sql); } @Test - public void testSqlInsertWithStaticPartition() { + void testSqlInsertWithStaticPartition() { final String sql = "insert into t1 partition(a=1) select b, c, d from t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -93,7 +93,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlInsertWithDynamicTableOptions() { + void testSqlInsertWithDynamicTableOptions() { final String sql = "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n" + "select a, b, c, d from t2"; @@ -109,7 +109,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testDynamicTableWithInvalidOptions() { + void testDynamicTableWithInvalidOptions() { final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -120,7 +120,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testBeginStatementSet() { + void testBeginStatementSet() { final String sql = "BEGIN STATEMENT SET"; Operation operation = parse(sql); assertThat(operation).isInstanceOf(BeginStatementSetOperation.class); @@ -131,7 +131,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testEnd() { + void testEnd() { final String sql = "END"; Operation operation = parse(sql); assertThat(operation).isInstanceOf(EndStatementSetOperation.class); @@ -142,7 +142,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithSelect() { + void testSqlRichExplainWithSelect() { final String sql = "explain plan for select a, b, c, d from t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -151,7 +151,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithInsert() { + void testSqlRichExplainWithInsert() { final String sql = "explain plan for insert into t1 select a, b, c, d from t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -160,7 +160,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithStatementSet() { + void testSqlRichExplainWithStatementSet() { final String sql = "explain plan for statement set begin " + "insert into t1 select a, b, c, d from t2 where a > 1;" @@ -173,7 +173,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testExplainDetailsWithSelect() { + void testExplainDetailsWithSelect() { final String sql = "explain estimated_cost, changelog_mode, plan_advice select a, b, c, d from t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); @@ -182,7 +182,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testExplainDetailsWithInsert() { + void testExplainDetailsWithInsert() { final String sql = "explain estimated_cost, changelog_mode, plan_advice insert into t1 select a, b, c, d from t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); @@ -191,7 +191,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testExplainDetailsWithStatementSet() { + void testExplainDetailsWithStatementSet() { final String sql = "explain estimated_cost, changelog_mode, plan_advice statement set begin " + "insert into t1 select a, b, c, d from t2 where a > 1;" @@ -215,7 +215,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlExecuteWithStatementSet() { + void testSqlExecuteWithStatementSet() { final String sql = "execute statement set begin " + "insert into t1 select a, b, c, d from t2 where a > 1;" @@ -228,7 +228,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithExecuteStatementSet() { + void testSqlRichExplainWithExecuteStatementSet() { final String sql = "EXPLAIN EXECUTE STATEMENT SET BEGIN " + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 1;" @@ -241,7 +241,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlExecuteWithInsert() { + void testSqlExecuteWithInsert() { final String sql = "execute insert into t1 select a, b, c, d from t2 where a > 1"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -250,7 +250,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithExecuteInsert() { + void testSqlRichExplainWithExecuteInsert() { final String sql = "EXPLAIN EXECUTE INSERT INTO t1 SELECT a, b, c, d FROM t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -259,7 +259,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlExecuteWithSelect() { + void testSqlExecuteWithSelect() { final String sql = "execute select a, b, c, d from t2 where a > 1"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -268,7 +268,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testSqlRichExplainWithExecuteSelect() { + void testSqlRichExplainWithExecuteSelect() { final String sql = "EXPLAIN EXECUTE SELECT a, b, c, d FROM t2"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); @@ -277,7 +277,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testDelete() throws Exception { + void testDelete() throws Exception { Map<String, String> options = new HashMap<>(); options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER); CatalogTable catalogTable = @@ -311,7 +311,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testUpdate() throws Exception { + void testUpdate() throws Exception { Map<String, String> options = new HashMap<>(); options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER); CatalogTable catalogTable = @@ -340,7 +340,7 @@ public class SqlDmlToOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testTruncateTable() { + void testTruncateTable() { String sql = "TRUNCATE TABLE t1"; FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 67a828d7fe0..460d8a91c91 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.operations; +import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.FunctionDescriptor; import org.apache.flink.table.api.Schema; @@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -50,8 +52,13 @@ import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -62,11 +69,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for the materialized table statements for {@link SqlNodeToOperationConversion}. */ -public class SqlMaterializedTableNodeToOperationConverterTest +class SqlMaterializedTableNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @BeforeEach - public void before() throws TableAlreadyExistException, DatabaseNotExistException { + void before() throws TableAlreadyExistException, DatabaseNotExistException { super.before(); final ObjectPath path3 = new ObjectPath(catalogManager.getCurrentDatabase(), "t3"); final Schema tableSchema = @@ -284,7 +291,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest + ")\n" + "FRESHNESS = INTERVAL '30' SECOND\n" + "REFRESH_MODE = FULL\n" - + "AS SELECT a, f1, f2 FROM t1,LATERAL TABLE(myFunc(b)) as T(f1, f2)"; + + "AS SELECT a, f1, f2 FROM t1, LATERAL TABLE(myFunc(b)) as T(f1, f2)"; Operation operation = parse(sql); assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); @@ -397,9 +404,8 @@ public class SqlMaterializedTableNodeToOperationConverterTest + "AS SELECT * FROM t1"; assertThatThrownBy(() -> parse(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Primary key validation failed: UNIQUE constraint is not supported yet."); + .isInstanceOf(SqlValidateException.class) + .hasMessageContaining("UNIQUE constraint is not supported yet"); // test primary key not defined in source table final String sql2 = @@ -412,7 +418,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest assertThatThrownBy(() -> parse(sql2)) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Primary key column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']."); + "Primary key column 'e' is not defined in the schema at line 2, column 31"); // test primary key with nullable source column final String sql3 = @@ -424,7 +430,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest assertThatThrownBy(() -> parse(sql3)) .isInstanceOf(ValidationException.class) - .hasMessageContaining("Could not create a PRIMARY KEY with nullable column 'd'."); + .hasMessageContaining("Invalid primary key 'ct1'. Column 'd' is nullable."); } @Test @@ -438,7 +444,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest assertThatThrownBy(() -> parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']."); + "Partition column 'e' not defined in the query schema. Available columns: ['a', 'b', 'c', 'd']"); final String sql2 = "CREATE MATERIALIZED TABLE mtbl1\n" @@ -516,6 +522,65 @@ public class SqlMaterializedTableNodeToOperationConverterTest "Materialized table freshness only support SECOND, MINUTE, HOUR, DAY as the time unit."); } + @ParameterizedTest + @MethodSource("testDataForCreateMaterializedTableFailedCase") + void createMaterializedTableFailedCase(String sql, String expectedErrorMsg) { + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessage(expectedErrorMsg); + } + + @ParameterizedTest + @MethodSource("testDataWithDifferentSchemasSuccessCase") + void createMaterializedTableSuccessCase(String sql, ResolvedSchema expected) { + CreateMaterializedTableOperation operation = (CreateMaterializedTableOperation) parse(sql); + assertThat(operation.getCatalogMaterializedTable().getResolvedSchema()).isEqualTo(expected); + } + + @Test + void createMaterializedTableWithWatermark() { + final String sql = + "CREATE MATERIALIZED TABLE users_shops (watermark for ts as ts - interval '2' second)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 as shop_id, 2 as user_id, cast(current_timestamp as timestamp(3)) ts"; + CreateMaterializedTableOperation operation = (CreateMaterializedTableOperation) parse(sql); + ResolvedSchema schema = operation.getCatalogMaterializedTable().getResolvedSchema(); + assertThat(schema.getColumns()) + .containsExactly( + Column.physical("shop_id", DataTypes.INT().notNull()), + Column.physical("user_id", DataTypes.INT().notNull()), + Column.physical("ts", DataTypes.TIMESTAMP(3).notNull())); + + assertThat(schema.getWatermarkSpecs()).hasSize(1); + WatermarkSpec watermarkSpec = schema.getWatermarkSpecs().get(0); + assertThat(watermarkSpec.getWatermarkExpression()) + .hasToString("`ts` - INTERVAL '2' SECOND"); + assertThat(schema.getPrimaryKey()).isEmpty(); + assertThat(schema.getIndexes()).isEmpty(); + } + + @Test + void createMaterializedTableWithWatermarkUsingColumnFromCreatePart() { + final String sql = + "CREATE MATERIALIZED TABLE users_shops (ts TIMESTAMP(3), watermark for ts as ts - interval '2' second)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT current_timestamp() as ts, 1 as shop_id, 2 as user_id"; + CreateMaterializedTableOperation operation = (CreateMaterializedTableOperation) parse(sql); + ResolvedSchema schema = operation.getCatalogMaterializedTable().getResolvedSchema(); + assertThat(schema.getColumns()) + .containsExactly( + Column.physical("ts", DataTypes.TIMESTAMP(3)), + Column.physical("shop_id", DataTypes.INT().notNull()), + Column.physical("user_id", DataTypes.INT().notNull())); + + assertThat(schema.getWatermarkSpecs()).hasSize(1); + WatermarkSpec watermarkSpec = schema.getWatermarkSpecs().get(0); + assertThat(watermarkSpec.getWatermarkExpression()) + .hasToString("`ts` - INTERVAL '2' SECOND"); + assertThat(schema.getPrimaryKey()).isEmpty(); + assertThat(schema.getIndexes()).isEmpty(); + } + @Test void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { final String sql = @@ -531,7 +596,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest } @Test - public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { + void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH"; Operation operation = parse(sql); @@ -644,49 +709,6 @@ public class SqlMaterializedTableNodeToOperationConverterTest + "FROM `builtin`.`default`.`t3` AS `t3`"))); } - @Test - void testAlterMaterializedTableAsQueryWithUnsupportedColumnChange() { - // 1. delete existing column - String sql1 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM t3"; - assertThatThrownBy(() -> parse(sql1)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to modify query because drop column is unsupported. When modifying a query, you can only append new columns at the end of original schema. The original schema has 4 columns, but the newly derived schema from the query has 2 columns."); - // 2. swap column position - String sql2 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c FROM t3"; - assertThatThrownBy(() -> parse(sql2)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position 2: Original column is [`c` INT], but new column is [`d` STRING]."); - // 3. change existing column type - String sql3 = - "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast(d as int) as d FROM t3"; - assertThatThrownBy(() -> parse(sql3)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position 3: Original column is [`d` STRING], but new column is [`d` INT]."); - // 4. change existing column nullability - String sql4 = - "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast('d' as string) as d FROM t3"; - assertThatThrownBy(() -> parse(sql4)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position 3: Original column is [`d` STRING], but new column is [`d` STRING NOT NULL]."); - } - - @Test - void testAlterAlterMaterializedTableAsQueryWithCatalogTable() { - // t1 is a CatalogTable not a Materialized Table - final String sql = "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1"; - assertThatThrownBy(() -> parse(sql)) - .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Only materialized tables support modifying the definition query."); - } - @Test void testDropMaterializedTable() { final String sql = "DROP MATERIALIZED TABLE mtbl1"; @@ -706,4 +728,153 @@ public class SqlMaterializedTableNodeToOperationConverterTest .isEqualTo( "DROP MATERIALIZED TABLE: (identifier: [`builtin`.`default`.`mtbl1`], IfExists: [true])"); } + + private static Collection<Arguments> testDataForCreateMaterializedTableFailedCase() { + final Collection<Arguments> list = new ArrayList<>(); + list.addAll(create()); + list.addAll(alter()); + return list; + } + + private static Collection<Arguments> alter() { + return List.of( + Arguments.of( + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM t3", + "Failed to modify query because drop column is unsupported. When modifying " + + "a query, you can only append new columns at the end of original " + + "schema. The original schema has 4 columns, but the newly derived " + + "schema from the query has 2 columns."), + Arguments.of( + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c FROM t3", + "When modifying the query of a materialized table, currently only support " + + "appending columns at the end of original schema, dropping, " + + "renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 2: Original column is [`c` INT], " + + "but new column is [`d` STRING]."), + Arguments.of( + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, CAST(d AS INT) AS d FROM t3", + "When modifying the query of a materialized table, currently only support " + + "appending columns at the end of original schema, dropping, " + + "renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 3: Original column is [`d` STRING], " + + "but new column is [`d` INT]."), + Arguments.of( + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, CAST('d' AS STRING) AS d FROM t3", + "When modifying the query of a materialized table, currently only support " + + "appending columns at the end of original schema, dropping, " + + "renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 3: Original column is [`d` STRING], " + + "but new column is [`d` STRING NOT NULL]."), + Arguments.of( + "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1", + "Only materialized tables support modifying the definition query.")); + } + + private static Collection<Arguments> create() { + return List.of( + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "The number of columns in the column list must match the number of columns in the source schema."), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (id, name, address)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "The number of columns in the column list must match the number of columns in the source schema."), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (id, name)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "Column 'id' not found in the source schema."), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id STRING, user_id STRING)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "Incompatible types for sink column 'shop_id' at position 0. The source column has type 'INT NOT NULL', " + + "while the target column has type 'STRING'."), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id INT, WATERMARK FOR ts AS `ts` - INTERVAL '5' SECOND)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "The rowtime attribute field 'ts' is not defined in the table schema, at line 1, column 67\n" + + "Available fields: ['shop_id', 'user_id']"), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id INT, user_id INT, PRIMARY KEY(id) NOT ENFORCED)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "Primary key column 'id' is not defined in the schema at line 1, column 78"), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (WATERMARK FOR ts AS ts - INTERVAL '2' SECOND)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "The rowtime attribute field 'ts' is not defined in the table schema, at line 1, column 54\n" + + "Available fields: ['shop_id', 'user_id']"), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (a INT, b INT)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "Invalid as physical column 'a' is defined in the DDL, but is not used in a query column."), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id INT, b INT)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + "Invalid as physical column 'b' is defined in the DDL, but is not used in a query column.")); + } + + private static Collection<Arguments> testDataWithDifferentSchemasSuccessCase() { + return List.of( + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (shop_id, user_id)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + ResolvedSchema.of( + Column.physical("shop_id", DataTypes.INT().notNull()), + Column.physical("user_id", DataTypes.INT().notNull()))), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT CAST(1 AS DOUBLE) AS shop_id, CAST(2 AS STRING) AS user_id", + ResolvedSchema.of( + Column.physical("shop_id", DataTypes.DOUBLE().notNull()), + Column.physical("user_id", DataTypes.STRING().notNull()))), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (user_id, shop_id)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + ResolvedSchema.of( + Column.physical("user_id", DataTypes.INT().notNull()), + Column.physical("shop_id", DataTypes.INT().notNull()))), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (user_id INT, shop_id BIGINT)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + ResolvedSchema.of( + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("user_id", DataTypes.INT()))), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (user_id INT, shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + new ResolvedSchema( + List.of( + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("user_id", DataTypes.INT().notNull())), + List.of(), + org.apache.flink.table.catalog.UniqueConstraint.primaryKey( + "PK_user_id", List.of("user_id")), + List.of())), + Arguments.of( + "CREATE MATERIALIZED TABLE users_shops (PRIMARY KEY(user_id) NOT ENFORCED)" + + " FRESHNESS = INTERVAL '30' SECOND" + + " AS SELECT 1 AS shop_id, 2 AS user_id", + new ResolvedSchema( + List.of( + Column.physical("shop_id", DataTypes.INT().notNull()), + Column.physical("user_id", DataTypes.INT().notNull())), + List.of(), + org.apache.flink.table.catalog.UniqueConstraint.primaryKey( + "PK_user_id", List.of("user_id")), + List.of()))); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java index 53f45f58d0c..87bc54a5af1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java @@ -58,9 +58,9 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertNotNull; /** Test for testing convert model statement to operation. */ -public class SqlModelOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlModelOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test - public void testCreateModel() { + void testCreateModel() { final String sql = "CREATE MODEL model1 \n" + "INPUT(a bigint comment 'column a', b varchar, c int, d varchar)\n" @@ -102,7 +102,7 @@ public class SqlModelOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testDropModel() throws Exception { + void testDropModel() throws Exception { Catalog catalog = new GenericInMemoryCatalog("default", "default"); if (!catalogManager.getCatalog("cat1").isPresent()) { catalogManager.registerCatalog("cat1", catalog); @@ -135,7 +135,7 @@ public class SqlModelOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testDescribeModel() { + void testDescribeModel() { Operation operation = parse("DESCRIBE MODEL m1"); assertThat(operation).isInstanceOf(DescribeModelOperation.class); DescribeModelOperation describeModelOperation = (DescribeModelOperation) operation; @@ -202,7 +202,7 @@ public class SqlModelOperationConverterTest extends SqlNodeToOperationConversion } @Test - public void testAlterModel() throws Exception { + void testAlterModel() throws Exception { Catalog catalog = new GenericInMemoryCatalog("default", "default"); if (!catalogManager.getCatalog("cat1").isPresent()) { catalogManager.registerCatalog("cat1", catalog); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java index a15ac7fc71b..c47d2431080 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java @@ -42,10 +42,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test cases for the call statements for {@link SqlNodeToOperationConversion}. */ -public class SqlNodeToCallOperationTest extends SqlNodeToOperationConversionTestBase { +class SqlNodeToCallOperationTest extends SqlNodeToOperationConversionTestBase { @BeforeEach - public void before() { + void before() { CatalogWithBuiltInProcedure procedureCatalog = new CatalogWithBuiltInProcedure("procedure_catalog"); catalogManager.registerCatalog("p1", procedureCatalog); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java index ef251378a5a..9ed19411a7a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java @@ -59,7 +59,7 @@ import java.util.function.Supplier; import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema; /** Test base for testing convert sql statement to operation. */ -public class SqlNodeToOperationConversionTestBase { +class SqlNodeToOperationConversionTestBase { private final boolean isStreamingMode = false; private final TableConfig tableConfig = TableConfig.getDefault(); protected final Catalog catalog = new GenericInMemoryCatalog("MockCatalog", "default"); @@ -96,7 +96,7 @@ public class SqlNodeToOperationConversionTestBase { plannerContext.getRexFactory()); @BeforeEach - public void before() throws TableAlreadyExistException, DatabaseNotExistException { + void before() throws TableAlreadyExistException, DatabaseNotExistException { catalogManager.initSchemaResolver( isStreamingMode, ExpressionResolverMocks.basicResolver(catalogManager, functionCatalog, parser), @@ -122,7 +122,7 @@ public class SqlNodeToOperationConversionTestBase { } @AfterEach - public void after() throws TableNotExistException { + void after() throws TableNotExistException { final ObjectPath path1 = new ObjectPath(catalogManager.getCurrentDatabase(), "t1"); final ObjectPath path2 = new ObjectPath(catalogManager.getCurrentDatabase(), "t2"); catalog.dropTable(path1, true); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java index 40114de579f..dba843b5eb7 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java @@ -46,10 +46,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to operation. */ -public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test - public void testReplaceTableAs() { + void testReplaceTableAs() { String tableName = "replace_table"; String tableComment = "test table comment 表描述"; String sql = @@ -62,7 +62,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testReplaceTableAsWithOrderingColumns() { + void testReplaceTableAsWithOrderingColumns() { String tableName = "replace_table"; String sql = "REPLACE TABLE " @@ -78,7 +78,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testReplaceTableAsWithNotFoundColumnIdentifiers() { + void testReplaceTableAsWithNotFoundColumnIdentifiers() { String tableName = "replace_table"; String sql = "REPLACE TABLE " @@ -91,7 +91,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testReplaceTableAsWithMismatchIdentifiersLength() { + void testReplaceTableAsWithMismatchIdentifiersLength() { String tableName = "replace_table"; String sql = "REPLACE TABLE " @@ -106,7 +106,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAs() { + void testCreateOrReplaceTableAs() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -116,7 +116,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithColumns() { + void testCreateOrReplaceTableAsWithColumns() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -136,7 +136,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithColumnsOverridden() { + void testCreateOrReplaceTableAsWithColumnsOverridden() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -157,7 +157,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() { + void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -171,7 +171,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { + void testCreateOrReplaceTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -188,7 +188,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithOverriddenComputedColumnsNotAllowed() { + void testCreateOrReplaceTableAsWithOverriddenComputedColumnsNotAllowed() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -204,7 +204,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() { + void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -221,7 +221,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithDistribution() { + void testCreateOrReplaceTableAsWithDistribution() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -240,7 +240,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithPrimaryKey() { + void testCreateOrReplaceTableAsWithPrimaryKey() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " @@ -260,7 +260,7 @@ public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConve } @Test - public void testCreateOrReplaceTableAsWithWatermark() { + void testCreateOrReplaceTableAsWithWatermark() { String tableName = "create_or_replace_table"; String sql = "CREATE OR REPLACE TABLE " diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java index 60606aa9771..baab04a471e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java @@ -27,17 +27,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class SqlShowToOperationConverterTest extends SqlNodeToOperationConversionTestBase { +class SqlShowToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @BeforeEach - public void before() throws TableAlreadyExistException, DatabaseNotExistException { + void before() throws TableAlreadyExistException, DatabaseNotExistException { // Do nothing // No need to create schema, tables and etc. since the test executes for unset catalog and // database } @AfterEach - public void after() throws TableNotExistException { + void after() throws TableNotExistException { // Do nothing }
