This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cc9540cacc2d775cc190ee6534794ec33fd07828 Author: Jane Chan <[email protected]> AuthorDate: Thu Dec 29 14:34:32 2022 +0800 [FLINK-22137][table] Support DROP column/constraint/watermark for ALTER TABLE statement This closes #21571 --- .../src/test/resources/sql/table.q | 120 +++++-- .../table/api/internal/TableEnvironmentImpl.java | 22 -- .../ddl/AlterTableDropConstraintOperation.java | 43 --- .../planner/expressions/ColumnReferenceFinder.java | 86 +++-- .../planner/operations/AlterSchemaConverter.java | 381 +++++++++++++++------ .../operations/SqlToOperationConverter.java | 69 ++-- .../expressions/ColumnReferenceFinderTest.java | 80 +++++ .../operations/SqlToOperationConverterTest.java | 337 ++++++++++-------- .../flink/table/api/TableEnvironmentTest.scala | 100 ++++++ 9 files changed, 856 insertions(+), 382 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index 36809085117..1d1764ac5e0 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -537,45 +537,109 @@ CREATE TABLE `default_catalog`.`default_database`.`orders2` ( !ok +# ========================================================================== +# test alter table drop column +# ========================================================================== + +alter table orders2 drop (amount1, product, cleaned_product); +[INFO] Execute statement succeed. +!info + +# verify table options using SHOW CREATE TABLE +show create table orders2; +CREATE TABLE `default_catalog`.`default_database`.`orders2` ( + `trade_order_id` BIGINT NOT NULL, + `ts` TIMESTAMP(3) NOT NULL, + `user` VARCHAR(2147483647), + `user_email` VARCHAR(2147483647) NOT NULL, + `product_id` BIGINT NOT NULL, + `ptime` AS PROCTIME(), + WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE, + CONSTRAINT `order_constraint` PRIMARY KEY (`trade_order_id`) NOT ENFORCED +) WITH ( + 'connector' = 'datagen' +) + +!ok + +# ========================================================================== +# test alter table drop primary key +# ========================================================================== + +alter table orders2 drop primary key; +[INFO] Execute statement succeed. +!info + +# verify table options using SHOW CREATE TABLE +show create table orders2; +CREATE TABLE `default_catalog`.`default_database`.`orders2` ( + `trade_order_id` BIGINT NOT NULL, + `ts` TIMESTAMP(3) NOT NULL, + `user` VARCHAR(2147483647), + `user_email` VARCHAR(2147483647) NOT NULL, + `product_id` BIGINT NOT NULL, + `ptime` AS PROCTIME(), + WATERMARK FOR `ts` AS `ts` - INTERVAL '1' MINUTE +) WITH ( + 'connector' = 'datagen' +) + +!ok +# ========================================================================== +# test alter table drop watermark +# ========================================================================== + +alter table orders2 drop watermark; +[INFO] Execute statement succeed. +!info + +# verify table options using SHOW CREATE TABLE +show create table orders2; +CREATE TABLE `default_catalog`.`default_database`.`orders2` ( + `trade_order_id` BIGINT NOT NULL, + `ts` TIMESTAMP(3) NOT NULL, + `user` VARCHAR(2147483647), + `user_email` VARCHAR(2147483647) NOT NULL, + `product_id` BIGINT NOT NULL, + `ptime` AS PROCTIME() +) WITH ( + 'connector' = 'datagen' +) + +!ok # ========================================================================== # test describe table # ========================================================================== describe orders2; -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -| name | type | null | key | extras | watermark | -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -| trade_order_id | BIGINT | FALSE | PRI(trade_order_id) | | | -| ts | TIMESTAMP(3) *ROWTIME* | FALSE | | | `ts` - INTERVAL '1' MINUTE | -| user | STRING | TRUE | | | | -| user_email | STRING | FALSE | | | | -| product_id | BIGINT | FALSE | | | | -| product | VARCHAR(32) | TRUE | | | | -| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | | -| amount1 | INT | TRUE | | | | -| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | | -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -9 rows in set ++----------------+-----------------------------+-------+-----+---------------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+-----------------------------+-------+-----+---------------+-----------+ +| trade_order_id | BIGINT | FALSE | | | | +| ts | TIMESTAMP(3) | FALSE | | | | +| user | STRING | TRUE | | | | +| user_email | STRING | FALSE | | | | +| product_id | BIGINT | FALSE | | | | +| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | | ++----------------+-----------------------------+-------+-----+---------------+-----------+ +6 rows in set !ok # test desc table desc orders2; -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -| name | type | null | key | extras | watermark | -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -| trade_order_id | BIGINT | FALSE | PRI(trade_order_id) | | | -| ts | TIMESTAMP(3) *ROWTIME* | FALSE | | | `ts` - INTERVAL '1' MINUTE | -| user | STRING | TRUE | | | | -| user_email | STRING | FALSE | | | | -| product_id | BIGINT | FALSE | | | | -| product | VARCHAR(32) | TRUE | | | | -| cleaned_product | VARCHAR(32) | FALSE | | AS COALESCE(`product`, 'missing_sku') | | -| amount1 | INT | TRUE | | | | -| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | | -+-----------------+-----------------------------+-------+---------------------+---------------------------------------+----------------------------+ -9 rows in set ++----------------+-----------------------------+-------+-----+---------------+-----------+ +| name | type | null | key | extras | watermark | ++----------------+-----------------------------+-------+-----+---------------+-----------+ +| trade_order_id | BIGINT | FALSE | | | | +| ts | TIMESTAMP(3) | FALSE | | | | +| user | STRING | TRUE | | | | +| user_email | STRING | FALSE | | | | +| product_id | BIGINT | FALSE | | | | +| ptime | TIMESTAMP_LTZ(3) *PROCTIME* | FALSE | | AS PROCTIME() | | ++----------------+-----------------------------+-------+-----+---------------+-----------+ +6 rows in set !ok # ========================================================================== diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index bffbef65c1c..4a1d000cc5e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -47,8 +47,6 @@ import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogPartition; import org.apache.flink.table.catalog.CatalogPartitionSpec; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; @@ -125,7 +123,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; -import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableOperation; import org.apache.flink.table.operations.ddl.AlterTableOptionsOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; @@ -160,7 +157,6 @@ import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.DataTypeUtils; -import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.table.utils.print.PrintStyle; import org.apache.flink.types.Row; import org.apache.flink.util.FlinkUserCodeClassLoaders; @@ -993,24 +989,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { alterTablePropertiesOp.getCatalogTable(), alterTablePropertiesOp.getTableIdentifier(), false); - } else if (alterTableOperation instanceof AlterTableDropConstraintOperation) { - AlterTableDropConstraintOperation dropConstraintOperation = - (AlterTableDropConstraintOperation) operation; - CatalogTable oriTable = - catalogManager - .getTable(dropConstraintOperation.getTableIdentifier()) - .get() - .getResolvedTable(); - CatalogTable newTable = - new CatalogTableImpl( - TableSchemaUtils.dropConstraint( - oriTable.getSchema(), - dropConstraintOperation.getConstraintName()), - oriTable.getPartitionKeys(), - oriTable.getOptions(), - oriTable.getComment()); - catalogManager.alterTable( - newTable, dropConstraintOperation.getTableIdentifier(), false); } else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) { AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java deleted file mode 100644 index b3f7c7d448f..00000000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterTableDropConstraintOperation.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.operations.ddl; - -import org.apache.flink.table.catalog.ObjectIdentifier; - -/** Operation of "ALTER TABLE ADD [CONSTRAINT constraintName] ..." clause. * */ -public class AlterTableDropConstraintOperation extends AlterTableOperation { - private final String constraintName; - - public AlterTableDropConstraintOperation( - ObjectIdentifier tableIdentifier, String constraintName) { - super(tableIdentifier); - this.constraintName = constraintName; - } - - public String getConstraintName() { - return constraintName; - } - - @Override - public String asSummaryString() { - return String.format( - "ALTER TABLE %s DROP CONSTRAINT %s", - tableIdentifier.asSummaryString(), constraintName); - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java index 7b22246de8d..d3593cb4d85 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java @@ -19,6 +19,9 @@ package org.apache.flink.table.planner.expressions; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionDefaultVisitor; @@ -29,33 +32,68 @@ import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.calcite.rex.RexInputRef; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** A finder used to look up referenced column name in a {@link ResolvedExpression}. */ public class ColumnReferenceFinder { private ColumnReferenceFinder() {} - public static Set<String> findReferencedColumn( - ResolvedExpression resolvedExpression, List<String> tableColumns) { - ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(tableColumns); - visitor.visit(resolvedExpression); - return visitor.referencedColumns; + /** + * Find referenced column names that derive the computed column. + * + * @param columnName the name of the column + * @param schema the schema contains the computed column definition + * @return the referenced column names + */ + public static Set<String> findReferencedColumn(String columnName, ResolvedSchema schema) { + Column column = + schema.getColumn(columnName) + .orElseThrow( + () -> + new ValidationException( + String.format( + "The input column %s doesn't exist in the schema.", + columnName))); + if (!(column instanceof Column.ComputedColumn)) { + return Collections.emptySet(); + } + ColumnReferenceVisitor visitor = + new ColumnReferenceVisitor( + // the input ref index is based on a projection of non-computed columns + schema.getColumns().stream() + .filter(c -> !(c instanceof Column.ComputedColumn)) + .map(Column::getName) + .collect(Collectors.toList())); + return visitor.visit(((Column.ComputedColumn) column).getExpression()); + } + + /** + * Find referenced column names that derive the watermark expression. + * + * @param schema resolved columns contains the watermark expression. + * @return the referenced column names + */ + public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) { + ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames()); + return schema.getWatermarkSpecs().stream() + .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream()) + .collect(Collectors.toSet()); } - private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Void> { + private static class ColumnReferenceVisitor extends ExpressionDefaultVisitor<Set<String>> { private final List<String> tableColumns; - private final Set<String> referencedColumns; public ColumnReferenceVisitor(List<String> tableColumns) { this.tableColumns = tableColumns; - this.referencedColumns = new HashSet<>(); } @Override - public Void visit(Expression expression) { + public Set<String> visit(Expression expression) { if (expression instanceof LocalReferenceExpression) { return visit((LocalReferenceExpression) expression); } else if (expression instanceof FieldReferenceExpression) { @@ -70,38 +108,34 @@ public class ColumnReferenceFinder { } @Override - public Void visit(FieldReferenceExpression fieldReference) { - referencedColumns.add(fieldReference.getName()); - return null; + public Set<String> visit(FieldReferenceExpression fieldReference) { + return Collections.singleton(fieldReference.getName()); } - public Void visit(LocalReferenceExpression localReference) { - referencedColumns.add(localReference.getName()); - return null; + public Set<String> visit(LocalReferenceExpression localReference) { + return Collections.singleton(localReference.getName()); } - public Void visit(RexNodeExpression rexNode) { + public Set<String> visit(RexNodeExpression rexNode) { // get the referenced column ref in table Set<RexInputRef> inputRefs = FlinkRexUtil.findAllInputRefs(rexNode.getRexNode()); // get the referenced column name by index - inputRefs.forEach( - inputRef -> { - int index = inputRef.getIndex(); - referencedColumns.add(tableColumns.get(index)); - }); - return null; + return inputRefs.stream() + .map(inputRef -> tableColumns.get(inputRef.getIndex())) + .collect(Collectors.toSet()); } @Override - public Void visit(CallExpression call) { + public Set<String> visit(CallExpression call) { + Set<String> references = new HashSet<>(); for (Expression expression : call.getChildren()) { - visit(expression); + references.addAll(visit(expression)); } - return null; + return references; } @Override - protected Void defaultMethod(Expression expression) { + protected Set<String> defaultMethod(Expression expression) { throw new TableException("Unexpected expression: " + expression); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java index b00038ce20f..b0b5e65fdf1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java @@ -19,6 +19,10 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark; import org.apache.flink.sql.parser.ddl.SqlAlterTableModify; import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn; import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema; @@ -29,9 +33,8 @@ import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.TableChange; @@ -62,6 +65,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -93,6 +98,10 @@ public class AlterSchemaConverter { this.schemaResolver = schemaResolver; } + /** + * Convert ALTER TABLE ADD | MODIFY (<schema_component> [, <schema_component>, ...]) + * to generate an updated Schema. + */ public Schema applySchemaChange( SqlAlterTableSchema alterTableSchema, Schema originSchema, @@ -101,7 +110,7 @@ public class AlterSchemaConverter { SchemaConverter converter = strategy == AlterSchemaStrategy.ADD ? new AddSchemaConverter( - originalSchema, + originSchema, (FlinkTypeFactory) sqlValidator.getTypeFactory(), sqlValidator, constraintValidator, @@ -109,7 +118,7 @@ public class AlterSchemaConverter { schemaResolver, tableChangeCollector) : new ModifySchemaConverter( - originalSchema, + originSchema, (FlinkTypeFactory) sqlValidator.getTypeFactory(), sqlValidator, constraintValidator, @@ -122,78 +131,145 @@ public class AlterSchemaConverter { return converter.convert(); } + /** Convert ALTER TABLE RENAME col_name to new_col_name to generate an updated Schema. */ public Schema applySchemaChange( - SqlAlterTableRenameColumn renameColumn, ContextResolvedTable originalTable) { - String oldColumnName = getColumnName(renameColumn.getOriginColumnIdentifier()); + SqlAlterTableRenameColumn renameColumn, ResolvedCatalogTable originTable) { + String originColumnName = getColumnName(renameColumn.getOriginColumnIdentifier()); String newColumnName = getColumnName(renameColumn.getNewColumnIdentifier()); - List<String> tableColumns = - originalTable.getResolvedSchema().getColumns().stream() - .map(Column::getName) - .collect(Collectors.toList()); - // validate old column is exists or new column isn't duplicated or old column isn't - // referenced by computed column + // validate origin column is exists, new column name does not collide with existed column + // names, and origin column isn't referenced by computed column validateColumnName( - oldColumnName, + originColumnName, newColumnName, - tableColumns, - originalTable.getResolvedSchema(), - ((CatalogTable) originalTable.getResolvedTable()).getPartitionKeys()); - - // validate old column isn't referenced by watermark - List<WatermarkSpec> watermarkSpecs = originalTable.getResolvedSchema().getWatermarkSpecs(); - watermarkSpecs.forEach( - watermarkSpec -> { - String rowtimeAttribute = watermarkSpec.getRowtimeAttribute(); - Set<String> referencedColumns = - ColumnReferenceFinder.findReferencedColumn( - watermarkSpec.getWatermarkExpression(), tableColumns); - if (oldColumnName.equals(rowtimeAttribute) - || referencedColumns.contains(oldColumnName)) { - throw new ValidationException( - String.format( - "Old column %s is referred by watermark expression %s, " - + "currently doesn't allow to rename column which is " - + "referred by watermark expression.", - oldColumnName, watermarkSpec.asSummaryString())); + originTable.getResolvedSchema(), + originTable.getPartitionKeys()); + validateWatermark(originTable, originColumnName); + + // generate new schema + Schema.Builder schemaBuilder = Schema.newBuilder(); + buildUpdatedColumn( + schemaBuilder, + originTable, + (builder, column) -> { + if (column.getName().equals(originColumnName)) { + buildNewColumnFromOriginColumn(builder, column, newColumnName); + } else { + builder.fromColumns(Collections.singletonList(column)); } }); + buildUpdatedPrimaryKey( + schemaBuilder, + originTable, + (pk) -> pk.equals(originColumnName) ? newColumnName : pk); + buildUpdatedWatermark(schemaBuilder, originTable); + return schemaBuilder.build(); + } - Schema.Builder builder = Schema.newBuilder(); - // build column - Schema originSchema = originalTable.getTable().getUnresolvedSchema(); - originSchema - .getColumns() + /** Convert ALTER TABLE DROP (col1 [, col2, ...]) to generate an updated Schema. */ + public Schema applySchemaChange( + SqlAlterTableDropColumn dropColumn, ResolvedCatalogTable originTable) { + Set<String> columnsToDrop = new HashSet<>(); + dropColumn + .getColumnList() .forEach( - column -> { - if (oldColumnName.equals(column.getName())) { - buildNewColumnFromOriginColumn(builder, column, newColumnName); - } else { - builder.fromColumns(Collections.singletonList(column)); + identifier -> { + String name = getColumnName((SqlIdentifier) identifier); + if (!columnsToDrop.add(name)) { + throw new ValidationException( + String.format( + "%sDuplicate column `%s`.", EX_MSG_PREFIX, name)); } }); - // build primary key - Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey(); - if (originPrimaryKey.isPresent()) { - List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames(); - String constrainName = originPrimaryKey.get().getConstraintName(); - List<String> newPrimaryKeyNames = - originPrimaryKeyNames.stream() - .map(pkName -> pkName.equals(oldColumnName) ? newColumnName : pkName) - .collect(Collectors.toList()); - builder.primaryKeyNamed(constrainName, newPrimaryKeyNames); - } - - // build watermark - originSchema - .getWatermarkSpecs() - .forEach( - watermarkSpec -> - builder.watermark( - watermarkSpec.getColumnName(), - watermarkSpec.getWatermarkExpression())); - // generate new schema - return builder.build(); + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (SqlNode columnIdentifier : dropColumn.getColumnList()) { + String columnToDrop = getColumnName((SqlIdentifier) columnIdentifier); + // validate the column to drop exists in the table schema, is not a primary key and + // does not derive any computed column + validateColumnName( + columnToDrop, + originTable.getResolvedSchema(), + originTable.getPartitionKeys(), + columnsToDrop); + validateWatermark(originTable, columnToDrop); + } + buildUpdatedColumn( + schemaBuilder, + originTable, + (builder, column) -> { + if (!columnsToDrop.contains(column.getName())) { + builder.fromColumns(Collections.singletonList(column)); + } + }); + buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity()); + buildUpdatedWatermark(schemaBuilder, originTable); + return schemaBuilder.build(); + } + + /** Convert ALTER TABLE DROP PRIMARY KEY to generate an updated Schema. */ + public Schema applySchemaChange( + SqlAlterTableDropPrimaryKey dropPrimaryKey, ResolvedCatalogTable originTable) { + Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey(); + if (!pkConstraint.isPresent()) { + throw new ValidationException( + String.format( + "%sThe base table does not define any primary key.", EX_MSG_PREFIX)); + } + Schema.Builder schemaBuilder = Schema.newBuilder(); + buildUpdatedColumn( + schemaBuilder, + originTable, + (builder, column) -> builder.fromColumns(Collections.singletonList(column))); + buildUpdatedWatermark(schemaBuilder, originTable); + return schemaBuilder.build(); + } + + /** + * Convert ALTER TABLE DROP CONSTRAINT constraint_name to generate an updated {@link Schema}. + */ + public Schema applySchemaChange( + SqlAlterTableDropConstraint dropConstraint, ResolvedCatalogTable originTable) { + Optional<UniqueConstraint> pkConstraint = originTable.getResolvedSchema().getPrimaryKey(); + if (!pkConstraint.isPresent()) { + throw new ValidationException( + String.format( + "%sThe base table does not define any primary key.", EX_MSG_PREFIX)); + } + SqlIdentifier constraintIdentifier = dropConstraint.getConstraintName(); + String constraintName = pkConstraint.get().getName(); + if (constraintIdentifier != null + && !constraintIdentifier.getSimple().equals(constraintName)) { + throw new ValidationException( + String.format( + "%sThe base table does not define a primary key constraint named '%s'. " + + "Available constraint name: ['%s'].", + EX_MSG_PREFIX, constraintIdentifier.getSimple(), constraintName)); + } + Schema.Builder schemaBuilder = Schema.newBuilder(); + buildUpdatedColumn( + schemaBuilder, + originTable, + (builder, column) -> builder.fromColumns(Collections.singletonList(column))); + buildUpdatedWatermark(schemaBuilder, originTable); + return schemaBuilder.build(); + } + + /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link Schema}. */ + public Schema applySchemaChange( + SqlAlterTableDropWatermark dropWatermark, ResolvedCatalogTable originTable) { + if (originTable.getResolvedSchema().getWatermarkSpecs().isEmpty()) { + throw new ValidationException( + String.format( + "%sThe base table does not define any watermark strategy.", + EX_MSG_PREFIX)); + } + Schema.Builder schemaBuilder = Schema.newBuilder(); + buildUpdatedColumn( + schemaBuilder, + originTable, + (builder, column) -> builder.fromColumns(Collections.singletonList(column))); + buildUpdatedPrimaryKey(schemaBuilder, originTable, Function.identity()); + return schemaBuilder.build(); } // -------------------------------------------------------------------------------------------- @@ -216,7 +292,7 @@ public class AlterSchemaConverter { List<Function<ResolvedSchema, TableChange>> changeBuilders = new ArrayList<>(); SchemaConverter( - Schema originalSchema, + Schema originSchema, FlinkTypeFactory typeFactory, SqlValidator sqlValidator, Consumer<SqlTableConstraint> constraintValidator, @@ -229,13 +305,13 @@ public class AlterSchemaConverter { this.escapeExpressions = escapeExpressions; this.schemaResolver = schemaResolver; this.changesCollector = changesCollector; - populateColumnsFromSourceTable(originalSchema); - populatePrimaryKeyFromSourceTable(originalSchema); - populateWatermarkFromSourceTable(originalSchema); + populateColumnsFromSourceTable(originSchema); + populatePrimaryKeyFromSourceTable(originSchema); + populateWatermarkFromSourceTable(originSchema); } - private void populateColumnsFromSourceTable(Schema originalSchema) { - originalSchema + private void populateColumnsFromSourceTable(Schema originSchema) { + originSchema .getColumns() .forEach( column -> { @@ -245,15 +321,15 @@ public class AlterSchemaConverter { }); } - private void populatePrimaryKeyFromSourceTable(Schema originalSchema) { - if (originalSchema.getPrimaryKey().isPresent()) { - primaryKey = originalSchema.getPrimaryKey().get(); + private void populatePrimaryKeyFromSourceTable(Schema originSchema) { + if (originSchema.getPrimaryKey().isPresent()) { + primaryKey = originSchema.getPrimaryKey().get(); } } - private void populateWatermarkFromSourceTable(Schema originalSchema) { + private void populateWatermarkFromSourceTable(Schema originSchema) { for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec : - originalSchema.getWatermarkSpecs()) { + originSchema.getWatermarkSpecs()) { watermarkSpec = sourceWatermarkSpec; } } @@ -303,13 +379,13 @@ public class AlterSchemaConverter { private void updatePrimaryKeyNullability(String columnName) { Schema.UnresolvedColumn column = columns.get(columnName); if (column instanceof Schema.UnresolvedPhysicalColumn) { - AbstractDataType<?> originalType = + AbstractDataType<?> originType = ((Schema.UnresolvedPhysicalColumn) column).getDataType(); columns.put( columnName, new Schema.UnresolvedPhysicalColumn( columnName, - originalType.notNull(), + originType.notNull(), column.getComment().orElse(null))); } } @@ -455,7 +531,7 @@ public class AlterSchemaConverter { private static class AddSchemaConverter extends SchemaConverter { AddSchemaConverter( - Schema originalSchema, + Schema originSchema, FlinkTypeFactory typeFactory, SqlValidator sqlValidator, Consumer<SqlTableConstraint> constraintValidator, @@ -463,7 +539,7 @@ public class AlterSchemaConverter { SchemaResolver schemaResolver, List<TableChange> changeCollector) { super( - originalSchema, + originSchema, typeFactory, sqlValidator, constraintValidator, @@ -536,7 +612,7 @@ public class AlterSchemaConverter { private static class ModifySchemaConverter extends SchemaConverter { ModifySchemaConverter( - Schema originalSchema, + Schema originSchema, FlinkTypeFactory typeFactory, SqlValidator sqlValidator, Consumer<SqlTableConstraint> constraintValidator, @@ -544,7 +620,7 @@ public class AlterSchemaConverter { SchemaResolver schemaResolver, List<TableChange> tableChangeCollector) { super( - originalSchema, + originSchema, typeFactory, sqlValidator, constraintValidator, @@ -611,52 +687,151 @@ public class AlterSchemaConverter { private void validateColumnName( String originColumnName, String newColumnName, - List<String> tableColumns, - ResolvedSchema originResolvedSchema, + ResolvedSchema originSchemas, List<String> partitionKeys) { - // validate old column - if (!tableColumns.contains(originColumnName)) { + validateColumnName( + originColumnName, + originSchemas, + partitionKeys, + // fail the operation of renaming column, once the column derives a computed column + (referencedColumn, computedColumn) -> referencedColumn.contains(originColumnName)); + // validate new column + if (originSchemas.getColumn(newColumnName).isPresent()) { throw new ValidationException( String.format( - "Old column %s not found in table schema for RENAME COLUMN", - originColumnName)); + "%sThe column `%s` already existed in table schema.", + EX_MSG_PREFIX, newColumnName)); } + } - // validate new column - if (tableColumns.contains(newColumnName)) { + private void validateColumnName( + String columnToDrop, + ResolvedSchema originSchema, + List<String> partitionKeys, + Set<String> columnsToDrop) { + validateColumnName( + columnToDrop, + originSchema, + partitionKeys, + // fail the operation of dropping column, only if the column derives a computed + // column, and the computed column is not being dropped along with the origin column + (referencedColumn, computedColumn) -> + referencedColumn.contains(columnToDrop) + && !columnsToDrop.contains(computedColumn.getName())); + originSchema + .getPrimaryKey() + .ifPresent( + pk -> { + if (pk.getColumns().contains(columnToDrop)) { + throw new ValidationException( + String.format( + "%sThe column `%s` is used as the primary key.", + EX_MSG_PREFIX, columnToDrop)); + } + }); + } + + private void validateColumnName( + String columnToAlter, + ResolvedSchema originSchema, + List<String> partitionKeys, + BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) { + // validate origin column + Set<String> tableColumns = new HashSet<>(originSchema.getColumnNames()); + if (!tableColumns.contains(columnToAlter)) { throw new ValidationException( String.format( - "New column %s already existed in table schema for RENAME COLUMN", - newColumnName)); + "%sThe column `%s` does not exist in the base table.", + EX_MSG_PREFIX, columnToAlter)); } - // validate old column name isn't referred by computed column case - originResolvedSchema.getColumns().stream() + // validate origin column name isn't referred by computed column case + originSchema.getColumns().stream() .filter(column -> column instanceof Column.ComputedColumn) .forEach( column -> { Column.ComputedColumn computedColumn = (Column.ComputedColumn) column; Set<String> referencedColumn = ColumnReferenceFinder.findReferencedColumn( - computedColumn.getExpression(), tableColumns); - if (referencedColumn.contains(originColumnName)) { + computedColumn.getName(), originSchema); + if (computedColumnChecker.apply(referencedColumn, computedColumn)) { throw new ValidationException( String.format( - "Old column %s is referred by computed column %s, currently doesn't " - + "allow to rename column which is referred by computed column.", - originColumnName, + "%sThe column `%s` is referenced by computed column %s.", + EX_MSG_PREFIX, + columnToAlter, computedColumn.asSummaryString())); } }); - // validate partition keys doesn't contain the old column - if (partitionKeys.contains(originColumnName)) { + // validate partition keys doesn't contain the origin column + if (partitionKeys.contains(columnToAlter)) { + throw new ValidationException( + String.format( + "%sThe column `%s` is used as the partition keys.", + EX_MSG_PREFIX, columnToAlter)); + } + } + + private void validateWatermark(ResolvedCatalogTable originTable, String columnToAlter) { + // validate origin column isn't referenced by watermark + List<WatermarkSpec> watermarkSpecs = originTable.getResolvedSchema().getWatermarkSpecs(); + Set<String> referencedColumns = + ColumnReferenceFinder.findWatermarkReferencedColumn( + originTable.getResolvedSchema()); + Set<String> rowtimeAttributes = + originTable.getResolvedSchema().getWatermarkSpecs().stream() + .map(WatermarkSpec::getRowtimeAttribute) + .collect(Collectors.toSet()); + if (rowtimeAttributes.contains(columnToAlter) + || referencedColumns.contains(columnToAlter)) { throw new ValidationException( String.format( - "Can not rename column %s because it is used as the partition keys.", - originColumnName)); + "%sThe column `%s` is referenced by watermark expression %s.", + EX_MSG_PREFIX, columnToAlter, watermarkSpecs)); } } + private void buildUpdatedColumn( + Schema.Builder builder, + ResolvedCatalogTable originTable, + BiConsumer<Schema.Builder, Schema.UnresolvedColumn> columnConsumer) { + // build column + originTable + .getUnresolvedSchema() + .getColumns() + .forEach(column -> columnConsumer.accept(builder, column)); + } + + private void buildUpdatedPrimaryKey( + Schema.Builder builder, + ResolvedCatalogTable originTable, + Function<String, String> columnRenamer) { + originTable + .getUnresolvedSchema() + .getPrimaryKey() + .ifPresent( + pk -> { + List<String> originPrimaryKeyNames = pk.getColumnNames(); + String constrainName = pk.getConstraintName(); + List<String> newPrimaryKeyNames = + originPrimaryKeyNames.stream() + .map(columnRenamer) + .collect(Collectors.toList()); + builder.primaryKeyNamed(constrainName, newPrimaryKeyNames); + }); + } + + private void buildUpdatedWatermark(Schema.Builder builder, ResolvedCatalogTable originTable) { + originTable + .getUnresolvedSchema() + .getWatermarkSpecs() + .forEach( + watermarkSpec -> + builder.watermark( + watermarkSpec.getColumnName(), + watermarkSpec.getWatermarkExpression())); + } + private void buildNewColumnFromOriginColumn( Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) { if (originColumn instanceof Schema.UnresolvedComputedColumn) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index 3ae34126d33..acb46f60fc9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -25,7 +25,10 @@ import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; import org.apache.flink.sql.parser.ddl.SqlAlterFunction; import org.apache.flink.sql.parser.ddl.SqlAlterTable; import org.apache.flink.sql.parser.ddl.SqlAlterTableCompact; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn; import org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey; +import org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark; import org.apache.flink.sql.parser.ddl.SqlAlterTableOptions; import org.apache.flink.sql.parser.ddl.SqlAlterTableRename; import org.apache.flink.sql.parser.ddl.SqlAlterTableRenameColumn; @@ -88,7 +91,6 @@ import org.apache.flink.sql.parser.dql.SqlUnloadModule; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -163,7 +165,6 @@ import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterPartitionPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; -import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation; import org.apache.flink.table.operations.ddl.AlterViewAsOperation; @@ -488,6 +489,7 @@ public class SqlToOperationConverter { if (baseTable instanceof CatalogView) { throw new ValidationException("ALTER TABLE for a view is not allowed"); } + ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) baseTable; if (sqlAlterTable instanceof SqlAlterTableRename) { UnresolvedIdentifier newUnresolvedIdentifier = UnresolvedIdentifier.of( @@ -503,23 +505,45 @@ public class SqlToOperationConverter { } else if (sqlAlterTable instanceof SqlAlterTableReset) { return convertAlterTableReset( tableIdentifier, (CatalogTable) baseTable, (SqlAlterTableReset) sqlAlterTable); + } else if (sqlAlterTable instanceof SqlAlterTableDropColumn) { + return new AlterTableSchemaOperation( + tableIdentifier, + CatalogTable.of( + alterSchemaConverter.applySchemaChange( + (SqlAlterTableDropColumn) sqlAlterTable, resolvedCatalogTable), + resolvedCatalogTable.getComment(), + resolvedCatalogTable.getPartitionKeys(), + resolvedCatalogTable.getOptions())); + } else if (sqlAlterTable instanceof SqlAlterTableDropPrimaryKey) { + return new AlterTableSchemaOperation( + tableIdentifier, + CatalogTable.of( + alterSchemaConverter.applySchemaChange( + (SqlAlterTableDropPrimaryKey) sqlAlterTable, + resolvedCatalogTable), + resolvedCatalogTable.getComment(), + resolvedCatalogTable.getPartitionKeys(), + resolvedCatalogTable.getOptions())); } else if (sqlAlterTable instanceof SqlAlterTableDropConstraint) { - SqlAlterTableDropConstraint dropConstraint = - ((SqlAlterTableDropConstraint) sqlAlterTable); - String constraintName = dropConstraint.getConstraintName().getSimple(); - TableSchema oriSchema = - TableSchema.fromResolvedSchema( - baseTable - .getUnresolvedSchema() - .resolve(catalogManager.getSchemaResolver())); - if (!oriSchema - .getPrimaryKey() - .filter(pk -> pk.getName().equals(constraintName)) - .isPresent()) { - throw new ValidationException( - String.format("CONSTRAINT [%s] does not exist", constraintName)); - } - return new AlterTableDropConstraintOperation(tableIdentifier, constraintName); + return new AlterTableSchemaOperation( + tableIdentifier, + CatalogTable.of( + alterSchemaConverter.applySchemaChange( + (SqlAlterTableDropConstraint) sqlAlterTable, + resolvedCatalogTable), + resolvedCatalogTable.getComment(), + resolvedCatalogTable.getPartitionKeys(), + resolvedCatalogTable.getOptions())); + } else if (sqlAlterTable instanceof SqlAlterTableDropWatermark) { + return new AlterTableSchemaOperation( + tableIdentifier, + CatalogTable.of( + alterSchemaConverter.applySchemaChange( + (SqlAlterTableDropWatermark) sqlAlterTable, + resolvedCatalogTable), + resolvedCatalogTable.getComment(), + resolvedCatalogTable.getPartitionKeys(), + resolvedCatalogTable.getOptions())); } else if (sqlAlterTable instanceof SqlAddReplaceColumns) { return OperationConverterUtils.convertAddReplaceColumns( tableIdentifier, @@ -537,15 +561,14 @@ public class SqlToOperationConverter { (SqlAlterTableRenameColumn) sqlAlterTable; Schema newSchema = alterSchemaConverter.applySchemaChange( - sqlAlterTableRenameColumn, optionalCatalogTable.get()); - CatalogTable baseCatalogTable = (CatalogTable) baseTable; + sqlAlterTableRenameColumn, resolvedCatalogTable); return new AlterTableSchemaOperation( tableIdentifier, CatalogTable.of( newSchema, - baseCatalogTable.getComment(), - baseCatalogTable.getPartitionKeys(), - baseCatalogTable.getOptions())); + resolvedCatalogTable.getComment(), + resolvedCatalogTable.getPartitionKeys(), + resolvedCatalogTable.getOptions())); } else if (sqlAlterTable instanceof SqlAddPartitions) { List<CatalogPartitionSpec> specs = new ArrayList<>(); List<CatalogPartition> partitions = new ArrayList<>(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java new file mode 100644 index 00000000000..0444afea947 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinderTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ColumnReferenceFinder}. */ +public class ColumnReferenceFinderTest extends TableTestBase { + + private final StreamTableTestUtil util = streamTestUtil(TableConfig.getDefault()); + private ResolvedSchema resolvedSchema; + + @BeforeEach + public void beforeEach() { + resolvedSchema = + util.testingTableEnv() + .getCatalogManager() + .getSchemaResolver() + .resolve( + Schema.newBuilder() + .columnByExpression("a", "b || '_001'") + .column("b", DataTypes.STRING()) + .columnByExpression("c", "d * e + 2") + .column("d", DataTypes.DOUBLE()) + .columnByMetadata("e", DataTypes.INT(), null, true) + .column( + "tuple", + DataTypes.ROW( + DataTypes.TIMESTAMP(3), DataTypes.INT())) + .columnByExpression("ts", "tuple.f0") + .watermark("ts", "ts - interval '5' day") + .build()); + } + + @Test + public void testFindReferencedColumn() { + assertThat(ColumnReferenceFinder.findReferencedColumn("b", resolvedSchema)) + .isEqualTo(Collections.emptySet()); + + assertThat(ColumnReferenceFinder.findReferencedColumn("a", resolvedSchema)) + .containsExactlyInAnyOrder("b"); + + assertThat(ColumnReferenceFinder.findReferencedColumn("c", resolvedSchema)) + .containsExactlyInAnyOrder("d", "e"); + + assertThat(ColumnReferenceFinder.findReferencedColumn("ts", resolvedSchema)) + .containsExactlyInAnyOrder("tuple"); + + assertThat(ColumnReferenceFinder.findWatermarkReferencedColumn(resolvedSchema)) + .containsExactlyInAnyOrder("ts"); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 4dedea89f21..5ad6bce141a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -81,7 +81,6 @@ import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.operations.command.ShowJarsOperation; import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; -import org.apache.flink.table.operations.ddl.AlterTableDropConstraintOperation; import org.apache.flink.table.operations.ddl.AlterTableRenameOperation; import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation; import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; @@ -1275,70 +1274,50 @@ public class SqlToOperationConverterTest { // rename pk column c Operation operation = parse("alter table tb1 rename c to c1"); assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); - assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema()) + assertThat(operation.asSummaryString()) .isEqualTo( - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.BIGINT().notNull()) - .column("c1", DataTypes.STRING().notNull()) - .withComment("column comment") - .columnByExpression("d", "a*(b+2 + a*b)") - .column( - "e", - DataTypes.ROW( - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.ROW( - DataTypes.DOUBLE(), - DataTypes.ARRAY(DataTypes.FLOAT())))) - .columnByExpression("f", "e.f1 + e.f2.f0") - .columnByMetadata("g", DataTypes.STRING(), null, true) - .column("ts", DataTypes.TIMESTAMP(3)) - .withComment("just a comment") - .watermark("ts", "ts - interval '5' seconds") - .primaryKeyNamed("ct1", "a", "b", "c1") - .build()); + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT NOT NULL,\n" + + " `c1` STRING NOT NULL COMMENT 'column comment',\n" + + " `d` AS [a*(b+2 + a*b)],\n" + + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" + + " `f` AS [e.f1 + e.f2.f0],\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n" + + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n" + + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c1`) NOT ENFORCED\n" + + ")"); // rename computed column operation = parse("alter table tb1 rename f to f1"); assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); - assertThat(((AlterTableSchemaOperation) operation).getCatalogTable().getUnresolvedSchema()) + assertThat(operation.asSummaryString()) .isEqualTo( - Schema.newBuilder() - .column("a", DataTypes.INT().notNull()) - .column("b", DataTypes.BIGINT().notNull()) - .column("c", DataTypes.STRING().notNull()) - .withComment("column comment") - .columnByExpression("d", "a*(b+2 + a*b)") - .column( - "e", - DataTypes.ROW( - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.ROW( - DataTypes.DOUBLE(), - DataTypes.ARRAY(DataTypes.FLOAT())))) - .columnByExpression("f1", "e.f1 + e.f2.f0") - .columnByMetadata("g", DataTypes.STRING(), null, true) - .column("ts", DataTypes.TIMESTAMP(3)) - .withComment("just a comment") - .watermark("ts", "ts - interval '5' seconds") - .primaryKeyNamed("ct1", "a", "b", "c") - .build()); + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT NOT NULL,\n" + + " `c` STRING NOT NULL COMMENT 'column comment',\n" + + " `d` AS [a*(b+2 + a*b)],\n" + + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" + + " `f1` AS [e.f1 + e.f2.f0],\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment',\n" + + " WATERMARK FOR `ts` AS [ts - interval '5' seconds],\n" + + " CONSTRAINT `ct1` PRIMARY KEY (`a`, `b`, `c`) NOT ENFORCED\n" + + ")"); // rename column c that is used in a computed column assertThatThrownBy(() -> parse("alter table tb1 rename a to a1")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Old column a is referred by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b), " - + "currently doesn't allow to rename column which is referred by computed column."); + "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b)."); // rename column used in the watermark expression assertThatThrownBy(() -> parse("alter table tb1 rename ts to ts1")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Old column ts is referred by watermark expression WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds, " - + "currently doesn't allow to rename column which is referred by watermark expression."); + "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds]."); // rename nested column assertThatThrownBy(() -> parse("alter table tb1 rename e.f1 to e.f11")) @@ -1348,8 +1327,7 @@ public class SqlToOperationConverterTest { // rename column with duplicate name assertThatThrownBy(() -> parse("alter table tb1 rename c to a")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "New column a already existed in table schema for RENAME COLUMN"); + .hasMessageContaining("The column `a` already existed in table schema."); // rename column e test computed column expression is ApiExpression which doesn't implement // the equals method @@ -1374,29 +1352,152 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table `cat1`.`db1`.`tb2` rename e to e1")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Old column e is referred by computed column `j` STRING AS upper(e), currently doesn't " - + "allow to rename column which is referred by computed column."); + "Failed to execute ALTER TABLE statement.\nThe column `e` is referenced by computed column `j` STRING AS upper(e)."); // rename column used as partition key assertThatThrownBy(() -> parse("alter table tb2 rename a to a1")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Can not rename column a because it is used as the partition keys"); + "Failed to execute ALTER TABLE statement.\nThe column `a` is used as the partition keys."); + } + + @Test + public void testFailedToAlterTableDropColumn() throws Exception { + prepareTable("tb1", false, false, true, 3); + + // drop a nonexistent column + assertThatThrownBy(() -> parse("alter table tb1 drop x")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `x` does not exist in the base table."); + + assertThatThrownBy(() -> parse("alter table tb1 drop (g, x)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `x` does not exist in the base table."); + + // duplicate column + assertThatThrownBy(() -> parse("alter table tb1 drop (g, c, g)")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Duplicate column `g`."); + + // drop a nested column + assertThatThrownBy(() -> parse("alter table tb1 drop e.f2")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Alter nested row type e.f2 is not supported yet."); + + // drop a column which generates a computed column + assertThatThrownBy(() -> parse("alter table tb1 drop a")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The column `a` is referenced by computed column `d` BIGINT NOT NULL AS a*(b+2 + a*b)."); + + // drop a column which is pk + assertThatThrownBy(() -> parse("alter table tb1 drop c")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The column `c` is used as the primary key."); + + // drop a column which defines watermark + assertThatThrownBy(() -> parse("alter table tb1 drop ts")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The column `ts` is referenced by watermark expression [WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - interval '5' seconds]."); + } + + @Test + public void testAlterTableDropColumn() throws Exception { + prepareNonManagedTable(false); + // drop a single column + Operation operation = parse("alter table tb1 drop c"); + assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT NOT NULL,\n" + + " `d` AS [a*(b+2 + a*b)],\n" + + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" + + " `f` AS [e.f1 + e.f2.f0],\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" + + ")"); + + // drop computed column and referenced columns together + operation = parse("alter table tb1 drop (f, e, b, d)"); + assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `c` STRING NOT NULL COMMENT 'column comment',\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" + + ")"); + } + + @Test + public void testFailedToAlterTableDropConstraint() throws Exception { + prepareNonManagedTable("tb1", 0); + assertThatThrownBy(() -> parse("alter table tb1 drop primary key")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The base table does not define any primary key."); + assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("The base table does not define any primary key."); + prepareNonManagedTable("tb2", 1); + assertThatThrownBy(() -> parse("alter table tb2 drop constraint ct2")) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "The base table does not define a primary key constraint named 'ct2'. Available constraint name: ['ct1']."); } @Test public void testAlterTableDropConstraint() throws Exception { prepareNonManagedTable(true); - // Test alter table add enforced + String expectedSummaryString = + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT NOT NULL,\n" + + " `c` STRING NOT NULL COMMENT 'column comment',\n" + + " `d` AS [a*(b+2 + a*b)],\n" + + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" + + " `f` AS [e.f1 + e.f2.f0],\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" + + ")"; + Operation operation = parse("alter table tb1 drop constraint ct1"); - assertThat(operation).isInstanceOf(AlterTableDropConstraintOperation.class); - AlterTableDropConstraintOperation dropConstraint = - (AlterTableDropConstraintOperation) operation; - assertThat(dropConstraint.asSummaryString()) - .isEqualTo("ALTER TABLE cat1.db1.tb1 DROP CONSTRAINT ct1"); - assertThatThrownBy(() -> parse("alter table tb1 drop constraint ct2")) + assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); + + operation = parse("alter table tb1 drop primary key"); + assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); + } + + @Test + public void testFailedToAlterTableDropWatermark() throws Exception { + prepareNonManagedTable("tb1", false); + assertThatThrownBy(() -> parse("alter table tb1 drop watermark")) .isInstanceOf(ValidationException.class) - .hasMessageContaining("CONSTRAINT [ct2] does not exist"); + .hasMessageContaining("The base table does not define any watermark strategy."); + } + + @Test + public void testAlterTableDropWatermark() throws Exception { + prepareNonManagedTable("tb1", true); + Operation operation = parse("alter table tb1 drop watermark"); + assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT NOT NULL,\n" + + " `c` STRING NOT NULL COMMENT 'column comment',\n" + + " `d` AS [a*(b+2 + a*b)],\n" + + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" + + " `f` AS [e.f1 + e.f2.f0],\n" + + " `g` METADATA VIRTUAL,\n" + + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" + + ")"); } @Test @@ -1464,50 +1565,38 @@ public class SqlToOperationConverterTest { // try to add a column with duplicated name assertThatThrownBy(() -> parse("alter table tb1 add a bigint")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Try to add a column `a` which already exists in the table."); + .hasMessageContaining("Try to add a column `a` which already exists in the table."); // try to add multiple columns with duplicated column name assertThatThrownBy(() -> parse("alter table tb1 add (x array<string>, x string)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Encounter duplicate column `x`."); + .hasMessageContaining("Encounter duplicate column `x`."); // refer to a nonexistent column assertThatThrownBy(() -> parse("alter table tb1 add x bigint after y")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Referenced column `y` by 'AFTER' does not exist in the table."); + "Referenced column `y` by 'AFTER' does not exist in the table."); // refer to a new added column that appears in the post position assertThatThrownBy(() -> parse("alter table tb1 add (x bigint after y, y string first)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Referenced column `y` by 'AFTER' does not exist in the table."); + "Referenced column `y` by 'AFTER' does not exist in the table."); // add a computed column based on nonexistent column assertThatThrownBy(() -> parse("alter table tb1 add m as n + 2")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid expression for computed column 'm'."); + .hasMessageContaining("Invalid expression for computed column 'm'."); // add a computed column based on another computed column assertThatThrownBy(() -> parse("alter table tb1 add (m as b * 2, n as m + 2)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid expression for computed column 'n'."); + .hasMessageContaining("Invalid expression for computed column 'n'."); // invalid expression assertThatThrownBy(() -> parse("alter table tb1 add (m as 'hello' || b)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid expression for computed column 'm'."); + .hasMessageContaining("Invalid expression for computed column 'm'."); // add an inner field to a nested row assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string)")) @@ -1517,9 +1606,7 @@ public class SqlToOperationConverterTest { // refer to a nested inner field assertThatThrownBy(() -> parse("alter table tb1 add (x string after e.f2)")) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Alter nested row type is not supported yet."); + .hasMessageContaining("Alter nested row type is not supported yet."); assertThatThrownBy(() -> parse("alter table tb1 add (e.f3 string after e.f1)")) .isInstanceOf(UnsupportedOperationException.class) @@ -1647,8 +1734,7 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb1 add primary key(c) not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table has already defined the primary key constraint [`a`]. " + "The base table has already defined the primary key constraint [`a`]. " + "You might want to drop it before adding a new one."); assertThatThrownBy( @@ -1657,8 +1743,7 @@ public class SqlToOperationConverterTest { "alter table tb1 add x string not null primary key not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table has already defined the primary key constraint [`a`]. " + "The base table has already defined the primary key constraint [`a`]. " + "You might want to drop it before adding a new one"); // the original table has composite pk @@ -1667,8 +1752,7 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb2 add primary key(c) not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table has already defined the primary key constraint [`a`, `b`]. " + "The base table has already defined the primary key constraint [`a`, `b`]. " + "You might want to drop it before adding a new one"); assertThatThrownBy( @@ -1677,8 +1761,7 @@ public class SqlToOperationConverterTest { "alter table tb2 add x string not null primary key not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table has already defined the primary key constraint [`a`, `b`]. " + "The base table has already defined the primary key constraint [`a`, `b`]. " + "You might want to drop it before adding a new one"); // the original table does not define pk @@ -1687,8 +1770,7 @@ public class SqlToOperationConverterTest { // specify a nonexistent column as pk assertThatThrownBy(() -> parse("alter table tb3 add primary key (x) not enforced")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid primary key 'PK_x'. Column 'x' does not exist."); + .hasMessageContaining("Invalid primary key 'PK_x'. Column 'x' does not exist."); // add unique constraint assertThatThrownBy(() -> parse("alter table tb3 add unique(b)")) @@ -1710,15 +1792,13 @@ public class SqlToOperationConverterTest { + " primary key (d, x) not enforced)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column."); + "Invalid primary key 'PK_d_x'. Column 'd' is not a physical column."); // add a pk which is metadata column assertThatThrownBy(() -> parse("alter table tb3 add (primary key (g) not enforced)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid primary key 'PK_g'. Column 'g' is not a physical column."); + "Invalid primary key 'PK_g'. Column 'g' is not a physical column."); } @Test @@ -1799,16 +1879,14 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb1 add watermark for x as x")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid column name 'x' for rowtime attribute in watermark declaration. " + "Invalid column name 'x' for rowtime attribute in watermark declaration. " + "Available columns are: [a, b, c, d, e, f, g, ts]"); // add watermark with invalid type assertThatThrownBy(() -> parse("alter table tb1 add watermark for b as b")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid data type of time field for watermark definition. " + "Invalid data type of time field for watermark definition. " + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), " + "the supported precision 'p' is from 0 to 3, but the time field type is BIGINT NOT NULL"); @@ -1818,9 +1896,7 @@ public class SqlToOperationConverterTest { parse( "alter table tb1 add (x row<f0 string, f1 timestamp(3)>, watermark for x.f1 as x.f1)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Watermark strategy on nested column is not supported yet."); + .hasMessageContaining("Watermark strategy on nested column is not supported yet."); // add watermark to the table which already has watermark defined prepareNonManagedTable("tb2", true); @@ -1828,8 +1904,7 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb2 add watermark for ts as ts")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table has already defined the watermark strategy " + "The base table has already defined the watermark strategy " + "`ts` AS ts - interval '5' seconds. " + "You might want to drop it before adding a new one."); } @@ -1924,47 +1999,42 @@ public class SqlToOperationConverterTest { // modify duplicated column same assertThatThrownBy(() -> parse("alter table tb1 modify (b int, b array<int not null>)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nEncounter duplicate column `b`."); + .hasMessageContaining("Encounter duplicate column `b`."); // modify nonexistent column name assertThatThrownBy(() -> parse("alter table tb1 modify x bigint")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nTry to modify a column `x` which does not exist in the table."); + "Try to modify a column `x` which does not exist in the table."); // refer to nonexistent column name assertThatThrownBy(() -> parse("alter table tb1 modify a bigint after x")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nReferenced column `x` by 'AFTER' does not exist in the table."); + "Referenced column `x` by 'AFTER' does not exist in the table."); // modify physical columns which generates computed column assertThatThrownBy(() -> parse("alter table tb1 modify e array<int>")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'f'."); + .hasMessageContaining("Invalid expression for computed column 'f'."); assertThatThrownBy(() -> parse("alter table tb1 modify a string")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'."); + .hasMessageContaining("Invalid expression for computed column 'd'."); assertThatThrownBy(() -> parse("alter table tb1 modify b as a + 2")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'."); + .hasMessageContaining("Invalid expression for computed column 'd'."); assertThatThrownBy(() -> parse("alter table tb1 modify (a timestamp(3), b multiset<int>)")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid expression for computed column 'd'."); + .hasMessageContaining("Invalid expression for computed column 'd'."); // modify the rowtime field which defines watermark assertThatThrownBy(() -> parse("alter table tb1 modify ts int")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. " + "Invalid data type of time field for watermark definition. " + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), " + "the supported precision 'p' is from 0 to 3, but the time field type is INT"); @@ -1974,12 +2044,12 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb2 modify (d int, a as b + 2)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column."); + "Invalid primary key 'ct1'. Column 'a' is not a physical column."); assertThatThrownBy(() -> parse("alter table tb2 modify (d string, a int metadata virtual)")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid primary key 'ct1'. Column 'a' is not a physical column."); + "Invalid primary key 'ct1'. Column 'a' is not a physical column."); // modify an inner field to a nested row assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string)")) @@ -1989,9 +2059,7 @@ public class SqlToOperationConverterTest { // refer to a nested inner field assertThatThrownBy(() -> parse("alter table tb2 modify (g string after e.f2)")) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Alter nested row type is not supported yet."); + .hasMessageContaining("Alter nested row type is not supported yet."); assertThatThrownBy(() -> parse("alter table tb2 modify (e.f0 string after e.f1)")) .isInstanceOf(UnsupportedOperationException.class) @@ -2132,8 +2200,7 @@ public class SqlToOperationConverterTest { "alter table tb1 modify constraint ct primary key (b) not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "The base table does not define any primary key constraint. You might want to add a new one."); + "The base table does not define any primary key constraint. You might want to add a new one."); prepareNonManagedTable("tb2", 1); @@ -2143,9 +2210,7 @@ public class SqlToOperationConverterTest { parse( "alter table tb2 modify constraint ct primary key (x) not enforced")) .isInstanceOf(ValidationException.class) - .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid primary key 'ct'. Column 'x' does not exist."); + .hasMessageContaining("Invalid primary key 'ct'. Column 'x' does not exist."); // specify computed column as pk assertThatThrownBy( @@ -2154,8 +2219,7 @@ public class SqlToOperationConverterTest { "alter table tb2 modify constraint ct primary key (d) not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid primary key 'ct'. Column 'd' is not a physical column."); + "Invalid primary key 'ct'. Column 'd' is not a physical column."); // specify metadata column as pk assertThatThrownBy( @@ -2164,8 +2228,7 @@ public class SqlToOperationConverterTest { "alter table tb2 modify constraint ct primary key (g) not enforced")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\n" - + "Invalid primary key 'ct'. Column 'g' is not a physical column."); + "Invalid primary key 'ct'. Column 'g' is not a physical column."); } @Test @@ -2226,7 +2289,7 @@ public class SqlToOperationConverterTest { "alter table tb1 modify watermark for a as to_timestamp(a) - interval '1' minute")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nThe base table does not define any watermark. You might want to add a new one."); + "The base table does not define any watermark. You might want to add a new one."); prepareNonManagedTable("tb2", true); @@ -2234,7 +2297,7 @@ public class SqlToOperationConverterTest { assertThatThrownBy(() -> parse("alter table tb2 modify watermark for a as a")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. " + "Invalid data type of time field for watermark definition. " + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, " + "but the time field type is INT NOT NULL"); @@ -2244,7 +2307,7 @@ public class SqlToOperationConverterTest { "alter table tb2 modify watermark for c as to_timestamp(c) - interval '1' day")) .isInstanceOf(ValidationException.class) .hasMessageContaining( - "Failed to execute ALTER TABLE statement.\nInvalid data type of time field for watermark definition. " + "Invalid data type of time field for watermark definition. " + "The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, " + "but the time field type is STRING"); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index a78bb5a07cc..0b2c47ce5ec 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -818,6 +818,106 @@ class TableEnvironmentTest { tableResult.collect()) } + @Test + def testAlterTableDropColumn(): Unit = { + val statement = + """ + |CREATE TABLE MyTable ( + | a BIGINT, + | b INT, + | d TIMESTAMP(3), + | e ROW<e0 STRING, e1 TIMESTAMP(3)>, + | WATERMARK FOR d AS d - INTERVAL '1' MINUTE + |) WITH ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + tableEnv.executeSql(statement) + + tableEnv.executeSql("ALTER TABLE MyTable DROP (e, a)") + + val expectedResult = util.Arrays.asList( + Row.of("b", "INT", Boolean.box(true), null, null, null), + Row.of( + "d", + "TIMESTAMP(3) *ROWTIME*", + Boolean.box(true), + null, + null, + "`d` - INTERVAL '1' MINUTE") + ) + val tableResult = tableEnv.executeSql("DESCRIBE MyTable") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + checkData(expectedResult.iterator(), tableResult.collect()) + } + + @Test + def testAlterTableDropConstraint(): Unit = { + val statement = + """ + |CREATE TABLE MyTable ( + | a BIGINT, + | b INT, + | d TIMESTAMP(3), + | e ROW<e0 STRING, e1 TIMESTAMP(3)>, + | CONSTRAINT ct PRIMARY KEY(a, b) NOT ENFORCED + |) WITH ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + tableEnv.executeSql(statement) + + tableEnv.executeSql("ALTER TABLE MyTable DROP CONSTRAINT ct") + + val expectedResult = util.Arrays.asList( + Row.of("a", "BIGINT", Boolean.box(false), null, null, null), + Row.of("b", "INT", Boolean.box(false), null, null, null), + Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null), + Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null) + ) + val tableResult1 = tableEnv.executeSql("DESCRIBE MyTable") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult1.getResultKind) + checkData(expectedResult.iterator(), tableResult1.collect()) + + tableEnv.executeSql("ALTER TABLE MyTable ADD CONSTRAINT ct PRIMARY KEY(a) NOT ENFORCED") + tableEnv.executeSql("ALTER TABLE MyTable DROP PRIMARY KEY") + val tableResult2 = tableEnv.executeSql("DESCRIBE MyTable") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult2.getResultKind) + checkData(expectedResult.iterator(), tableResult2.collect()) + } + + @Test + def testAlterTableDropWatermark(): Unit = { + val statement = + """ + |CREATE TABLE MyTable ( + | a BIGINT, + | b INT, + | d TIMESTAMP(3), + | e ROW<e0 STRING, e1 TIMESTAMP(3)>, + | WATERMARK FOR d AS d - INTERVAL '1' MINUTE + |) WITH ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + tableEnv.executeSql(statement) + + tableEnv.executeSql("ALTER TABLE MyTable DROP WATERMARK") + + val expectedResult = util.Arrays.asList( + Row.of("a", "BIGINT", Boolean.box(true), null, null, null), + Row.of("b", "INT", Boolean.box(true), null, null, null), + Row.of("d", "TIMESTAMP(3)", Boolean.box(true), null, null, null), + Row.of("e", "ROW<`e0` STRING, `e1` TIMESTAMP(3)>", Boolean.box(true), null, null, null) + ) + val tableResult = tableEnv.executeSql("DESCRIBE MyTable") + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + checkData(expectedResult.iterator(), tableResult.collect()) + } + @Test def testAlterTableCompactOnNonManagedTable(): Unit = { val statement =
