LadyForest commented on code in PR #21592: URL: https://github.com/apache/flink/pull/21592#discussion_r1060581278
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java: ########## @@ -229,6 +229,53 @@ static ModifyWatermark modify(WatermarkSpec newWatermarkSpec) { return new ModifyWatermark(newWatermarkSpec); } + /** + * A table change to drop column. + * + * <p>It is equal to the following statement: + * + * <pre> + * ALTER TABLE <table_name> DROP COLUMN <column_name> + * </pre> + * + * @param columnName the column to drop. + * @return a TableChange represents the modification. + */ + static DropColumn dropColumn(String columnName) { + return new DropColumn(columnName); + } + + /** + * A table change to drop watermark. + * + * <p>It is equal to the following statement: + * + * <pre> + * ALTER TABLE <table_name> DROP WATERMARK + * </pre> + * + * @return a TableChange represents the modification. + */ + static DropWatermark dropWatermark() { + return DropWatermark.INSTANCE; + } + + /** + * A table change to drop constraint. + * + * <p>It is equal to the following statement: + * + * <pre> + * ALTER TABLE <table_name> DROP CONSTRAINT <constraint_name> + * </pre> + * + * @param constraintName the constraint to drop. + * @return a TableChange represents the modification. + */ + static DropUniqueConstraint dropConstraint(String constraintName) { Review Comment: AFAIK Flink does not support unique constraints, thus`UniqueConstraint` sounds slightly confusing. What about `DropConstraint`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java: ########## @@ -81,7 +82,11 @@ public static Set<String> findReferencedColumn(String columnName, ResolvedSchema public static Set<String> findWatermarkReferencedColumn(ResolvedSchema schema) { ColumnReferenceVisitor visitor = new ColumnReferenceVisitor(schema.getColumnNames()); return schema.getWatermarkSpecs().stream() - .flatMap(spec -> visitor.visit(spec.getWatermarkExpression()).stream()) + .flatMap( + spec -> + Stream.concat( + visitor.visit(spec.getWatermarkExpression()).stream(), + Stream.of(spec.getRowtimeAttribute()))) Review Comment: I'm curious under what condition the watermark expression does not contain the rowtime field? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ########## @@ -133,6 +132,14 @@ public static Operation convertAddReplaceColumns( newProperties, catalogTable.getComment()); if (addReplaceColumns.isReplace()) { + // It's hard to determine how to decompose the ALTER TABLE REPLACE into multiple + // TableChanges. For example, with old schema <a INT, b INT, c INT> and the new schema + // <a INT, d INT>, there are multiple choices: + // plan 1: DROP COLUMN c, RENAME COLUMN b TO d; + // plan 2: DROP COLUMN b, RENAME COLUMN c TO d; + // So we don't translate with TableChanges here. One solution to solve this problem is + // the minimum edit distance, which tries to minimize the modification times, but it + // also can not give a unique answer. Review Comment: How about ```java // It's hard to define how to convert the ALTER TABLE REPLACE into multiple // TableChanges. For example, with old schema <a INT, b INT, c INT> and the new schema // <a INT, d INT>, there are multiple alternatives: // plan 1: DROP COLUMN c, RENAME COLUMN b TO d; // plan 2: DROP COLUMN b, RENAME COLUMN c TO d; // So we don't translate with TableChanges here. One workaround is // to minimize the edit distance, i.e., minimize the modification times, but it // still cannot provide a deterministic answer. ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ########## @@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) { } } - // -------------------------------------------------------------------------------------------- + private static class ReferencesManager { + + /** Available columns in the table. */ + private final Set<String> columns; + + /** + * Mappings about the column refers which columns, e.g. column `b` refers to the column `a` + * in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToReferences; + + /** + * Reverse mappings about the column refers which columns, e.g. column `a` has the + * dependency of column `b` in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToDependencies; + + /** Primary keys defined on the table. */ + private final Set<String> primaryKeys; + + /** The name of the column watermark expression depends on. */ + private final Set<String> watermarkReferences; + + /** The name of the column partition keys contains. */ + private final Set<String> partitionKeys; + + private ReferencesManager( + Set<String> columns, + Map<String, Set<String>> columnToReferences, + Map<String, Set<String>> columnToDependencies, + Set<String> primaryKeys, + Set<String> watermarkReferences, + Set<String> partitionKeys) { + this.columns = columns; + this.columnToReferences = columnToReferences; + this.columnToDependencies = columnToDependencies; + this.primaryKeys = primaryKeys; + this.watermarkReferences = watermarkReferences; + this.partitionKeys = partitionKeys; + } + + static ReferencesManager create(ResolvedCatalogTable catalogTable) { + Map<String, Set<String>> columnToReferences = new HashMap<>(); + Map<String, Set<String>> columnToDependencies = new HashMap<>(); + catalogTable.getResolvedSchema().getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + column.getName(), catalogTable.getResolvedSchema()); + for (String referencedColumn : referencedColumns) { + columnToReferences + .computeIfAbsent( + referencedColumn, key -> new HashSet<>()) + .add(column.getName()); + columnToDependencies + .computeIfAbsent( + column.getName(), key -> new HashSet<>()) + .add(referencedColumn); + } + }); - private void validateColumnName( - String oldColumnName, - String newColumnName, - ResolvedSchema oldSchema, - List<String> partitionKeys) { - validateColumnName( - oldColumnName, - oldSchema, - partitionKeys, - // fail the operation of renaming column, once the column derives a computed column - (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName)); - // validate new column - if (oldSchema.getColumn(newColumnName).isPresent()) { - throw new ValidationException( - String.format( - "%sThe column `%s` already existed in table schema.", - EX_MSG_PREFIX, newColumnName)); + return new ReferencesManager( + new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()), + columnToReferences, + columnToDependencies, + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .map(constraint -> new HashSet<>(constraint.getColumns())) + .orElse(new HashSet<>()), + ColumnReferenceFinder.findWatermarkReferencedColumn( + catalogTable.getResolvedSchema()), + new HashSet<>(catalogTable.getPartitionKeys())); + } + + void dropColumn(String columnName) { + checkReferences(columnName); + if (primaryKeys.contains(columnName)) { + throw new ValidationException( + String.format( + "%sThe column %s is used as the primary key.", + EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName))); + } + + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> + columnToReferences.get(referredColumn).remove(columnName)); + columnToDependencies.remove(columnName); + columns.remove(columnName); } - } - private void validateColumnName( - String columnToDrop, - ResolvedSchema oldSchema, - List<String> partitionKeys, - Set<String> columnsToDrop) { - validateColumnName( - columnToDrop, - oldSchema, - partitionKeys, - // fail the operation of dropping column, only if the column derives a computed - // column, and the computed column is not being dropped along with the old column - (referencedColumn, computedColumn) -> - referencedColumn.contains(columnToDrop) - && !columnsToDrop.contains(computedColumn.getName())); - oldSchema - .getPrimaryKey() - .ifPresent( - pk -> { - if (pk.getColumns().contains(columnToDrop)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the primary key.", - EX_MSG_PREFIX, columnToDrop)); - } - }); - } + void renameColumn(String columnName, String newName) { + checkReferences(columnName); + if (columns.contains(newName)) { + throw new ValidationException( + String.format( + "%sThe column `%s` already existed in table schema.", + EX_MSG_PREFIX, newName)); + } - private void validateColumnName( - String columnToAlter, - ResolvedSchema oldSchema, - List<String> partitionKeys, - BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) { - // validate old column - Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames()); - if (!tableColumns.contains(columnToAlter)) { - throw new ValidationException( - String.format( - "%sThe column `%s` does not exist in the base table.", - EX_MSG_PREFIX, columnToAlter)); + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> { + columnToReferences.get(referredColumn).remove(columnName); + columnToReferences.get(referredColumn).add(newName); + }); + columnToDependencies.put(newName, columnToDependencies.remove(columnName)); + + columns.remove(columnName); + columns.add(newName); + + primaryKeys.remove(columnName); + primaryKeys.add(newName); } - // validate old column name isn't referred by computed column case - oldSchema.getColumns().stream() - .filter(column -> column instanceof Column.ComputedColumn) - .forEach( - column -> { - Column.ComputedColumn computedColumn = (Column.ComputedColumn) column; - Set<String> referencedColumn = - ColumnReferenceFinder.findReferencedColumn( - computedColumn.getName(), oldSchema); - if (computedColumnChecker.apply(referencedColumn, computedColumn)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is referenced by computed column %s.", - EX_MSG_PREFIX, - columnToAlter, - computedColumn.asSummaryString())); - } - }); - // validate partition keys doesn't contain the old column - if (partitionKeys.contains(columnToAlter)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the partition keys.", - EX_MSG_PREFIX, columnToAlter)); + Set<String> lookupColumnDependencies(String columnName) { + return columnToDependencies.getOrDefault(columnName, Collections.emptySet()); } Review Comment: Nit: what about ```suggestion int getColumnDependencyCount(String columnName) { return columnToDependencies.getOrDefault(columnName, Collections.emptySet()).size(); } ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ########## @@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) { } } - // -------------------------------------------------------------------------------------------- + private static class ReferencesManager { + + /** Available columns in the table. */ + private final Set<String> columns; + + /** + * Mappings about the column refers which columns, e.g. column `b` refers to the column `a` + * in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToReferences; + + /** + * Reverse mappings about the column refers which columns, e.g. column `a` has the + * dependency of column `b` in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToDependencies; + + /** Primary keys defined on the table. */ + private final Set<String> primaryKeys; + + /** The name of the column watermark expression depends on. */ + private final Set<String> watermarkReferences; + + /** The name of the column partition keys contains. */ + private final Set<String> partitionKeys; + + private ReferencesManager( + Set<String> columns, + Map<String, Set<String>> columnToReferences, + Map<String, Set<String>> columnToDependencies, + Set<String> primaryKeys, + Set<String> watermarkReferences, + Set<String> partitionKeys) { + this.columns = columns; + this.columnToReferences = columnToReferences; + this.columnToDependencies = columnToDependencies; + this.primaryKeys = primaryKeys; + this.watermarkReferences = watermarkReferences; + this.partitionKeys = partitionKeys; + } + + static ReferencesManager create(ResolvedCatalogTable catalogTable) { + Map<String, Set<String>> columnToReferences = new HashMap<>(); + Map<String, Set<String>> columnToDependencies = new HashMap<>(); + catalogTable.getResolvedSchema().getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + column.getName(), catalogTable.getResolvedSchema()); + for (String referencedColumn : referencedColumns) { + columnToReferences + .computeIfAbsent( + referencedColumn, key -> new HashSet<>()) + .add(column.getName()); + columnToDependencies + .computeIfAbsent( + column.getName(), key -> new HashSet<>()) + .add(referencedColumn); + } + }); - private void validateColumnName( - String oldColumnName, - String newColumnName, - ResolvedSchema oldSchema, - List<String> partitionKeys) { - validateColumnName( - oldColumnName, - oldSchema, - partitionKeys, - // fail the operation of renaming column, once the column derives a computed column - (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName)); - // validate new column - if (oldSchema.getColumn(newColumnName).isPresent()) { - throw new ValidationException( - String.format( - "%sThe column `%s` already existed in table schema.", - EX_MSG_PREFIX, newColumnName)); + return new ReferencesManager( + new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()), + columnToReferences, + columnToDependencies, + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .map(constraint -> new HashSet<>(constraint.getColumns())) + .orElse(new HashSet<>()), + ColumnReferenceFinder.findWatermarkReferencedColumn( + catalogTable.getResolvedSchema()), + new HashSet<>(catalogTable.getPartitionKeys())); + } + + void dropColumn(String columnName) { + checkReferences(columnName); + if (primaryKeys.contains(columnName)) { + throw new ValidationException( + String.format( + "%sThe column %s is used as the primary key.", + EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName))); + } + + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> + columnToReferences.get(referredColumn).remove(columnName)); + columnToDependencies.remove(columnName); + columns.remove(columnName); } - } - private void validateColumnName( - String columnToDrop, - ResolvedSchema oldSchema, - List<String> partitionKeys, - Set<String> columnsToDrop) { - validateColumnName( - columnToDrop, - oldSchema, - partitionKeys, - // fail the operation of dropping column, only if the column derives a computed - // column, and the computed column is not being dropped along with the old column - (referencedColumn, computedColumn) -> - referencedColumn.contains(columnToDrop) - && !columnsToDrop.contains(computedColumn.getName())); - oldSchema - .getPrimaryKey() - .ifPresent( - pk -> { - if (pk.getColumns().contains(columnToDrop)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the primary key.", - EX_MSG_PREFIX, columnToDrop)); - } - }); - } + void renameColumn(String columnName, String newName) { Review Comment: Nit: `columnName` -> `oldColumnName`? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java: ########## @@ -1507,19 +1507,15 @@ public void testFailedToAlterTableDropWatermark() throws Exception { public void testAlterTableDropWatermark() throws Exception { prepareNonManagedTable("tb1", true); Operation operation = parse("alter table tb1 drop watermark"); - assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); assertThat(operation.asSummaryString()) - .isEqualTo( - "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" - + " `a` INT NOT NULL,\n" - + " `b` BIGINT NOT NULL,\n" - + " `c` STRING NOT NULL COMMENT 'column comment',\n" - + " `d` AS [a*(b+2 + a*b)],\n" - + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" - + " `f` AS [e.f1 + e.f2.f0],\n" - + " `g` METADATA VIRTUAL,\n" - + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" - + ")"); + .isEqualTo("ALTER TABLE cat1.db1.tb1\n DROP WATERMARK"); + assertThat( + ((AlterTableChangeOperation) operation) + .getNewTable() + .getUnresolvedSchema() + .getWatermarkSpecs()) + .isEqualTo(Collections.emptyList()); Review Comment: ```suggestion .isEmpty(); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ########## @@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) { } } - // -------------------------------------------------------------------------------------------- + private static class ReferencesManager { + + /** Available columns in the table. */ + private final Set<String> columns; + + /** + * Mappings about the column refers which columns, e.g. column `b` refers to the column `a` + * in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToReferences; + + /** + * Reverse mappings about the column refers which columns, e.g. column `a` has the + * dependency of column `b` in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToDependencies; + + /** Primary keys defined on the table. */ + private final Set<String> primaryKeys; + + /** The name of the column watermark expression depends on. */ + private final Set<String> watermarkReferences; + + /** The name of the column partition keys contains. */ + private final Set<String> partitionKeys; + + private ReferencesManager( + Set<String> columns, + Map<String, Set<String>> columnToReferences, + Map<String, Set<String>> columnToDependencies, + Set<String> primaryKeys, + Set<String> watermarkReferences, + Set<String> partitionKeys) { + this.columns = columns; + this.columnToReferences = columnToReferences; + this.columnToDependencies = columnToDependencies; + this.primaryKeys = primaryKeys; + this.watermarkReferences = watermarkReferences; + this.partitionKeys = partitionKeys; + } + + static ReferencesManager create(ResolvedCatalogTable catalogTable) { + Map<String, Set<String>> columnToReferences = new HashMap<>(); + Map<String, Set<String>> columnToDependencies = new HashMap<>(); + catalogTable.getResolvedSchema().getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + column.getName(), catalogTable.getResolvedSchema()); + for (String referencedColumn : referencedColumns) { + columnToReferences + .computeIfAbsent( + referencedColumn, key -> new HashSet<>()) + .add(column.getName()); + columnToDependencies + .computeIfAbsent( + column.getName(), key -> new HashSet<>()) + .add(referencedColumn); + } + }); - private void validateColumnName( - String oldColumnName, - String newColumnName, - ResolvedSchema oldSchema, - List<String> partitionKeys) { - validateColumnName( - oldColumnName, - oldSchema, - partitionKeys, - // fail the operation of renaming column, once the column derives a computed column - (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName)); - // validate new column - if (oldSchema.getColumn(newColumnName).isPresent()) { - throw new ValidationException( - String.format( - "%sThe column `%s` already existed in table schema.", - EX_MSG_PREFIX, newColumnName)); + return new ReferencesManager( + new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()), + columnToReferences, + columnToDependencies, + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .map(constraint -> new HashSet<>(constraint.getColumns())) + .orElse(new HashSet<>()), + ColumnReferenceFinder.findWatermarkReferencedColumn( + catalogTable.getResolvedSchema()), + new HashSet<>(catalogTable.getPartitionKeys())); + } + + void dropColumn(String columnName) { + checkReferences(columnName); + if (primaryKeys.contains(columnName)) { + throw new ValidationException( + String.format( + "%sThe column %s is used as the primary key.", + EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName))); + } + + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> + columnToReferences.get(referredColumn).remove(columnName)); + columnToDependencies.remove(columnName); + columns.remove(columnName); } - } - private void validateColumnName( - String columnToDrop, - ResolvedSchema oldSchema, - List<String> partitionKeys, - Set<String> columnsToDrop) { - validateColumnName( - columnToDrop, - oldSchema, - partitionKeys, - // fail the operation of dropping column, only if the column derives a computed - // column, and the computed column is not being dropped along with the old column - (referencedColumn, computedColumn) -> - referencedColumn.contains(columnToDrop) - && !columnsToDrop.contains(computedColumn.getName())); - oldSchema - .getPrimaryKey() - .ifPresent( - pk -> { - if (pk.getColumns().contains(columnToDrop)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the primary key.", - EX_MSG_PREFIX, columnToDrop)); - } - }); - } + void renameColumn(String columnName, String newName) { + checkReferences(columnName); + if (columns.contains(newName)) { + throw new ValidationException( + String.format( + "%sThe column `%s` already existed in table schema.", + EX_MSG_PREFIX, newName)); + } - private void validateColumnName( - String columnToAlter, - ResolvedSchema oldSchema, - List<String> partitionKeys, - BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) { - // validate old column - Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames()); - if (!tableColumns.contains(columnToAlter)) { - throw new ValidationException( - String.format( - "%sThe column `%s` does not exist in the base table.", - EX_MSG_PREFIX, columnToAlter)); + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> { + columnToReferences.get(referredColumn).remove(columnName); + columnToReferences.get(referredColumn).add(newName); + }); + columnToDependencies.put(newName, columnToDependencies.remove(columnName)); + + columns.remove(columnName); + columns.add(newName); + + primaryKeys.remove(columnName); Review Comment: I don't think we can directly add `newColumnName` to `primaryKeys`. ```suggestion if (primaryKeys.remove(columnName)) { primaryKeys.add(newName); } ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java: ########## @@ -1474,25 +1470,29 @@ public void testFailedToAlterTableDropConstraint() throws Exception { @Test public void testAlterTableDropConstraint() throws Exception { prepareNonManagedTable(true); - String expectedSummaryString = - "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" - + " `a` INT NOT NULL,\n" - + " `b` BIGINT NOT NULL,\n" - + " `c` STRING NOT NULL COMMENT 'column comment',\n" - + " `d` AS [a*(b+2 + a*b)],\n" - + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" - + " `f` AS [e.f1 + e.f2.f0],\n" - + " `g` METADATA VIRTUAL,\n" - + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" - + ")"; + String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n DROP CONSTRAINT ct1"; Operation operation = parse("alter table tb1 drop constraint ct1"); - assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); + assertThat( + ((AlterTableChangeOperation) operation) + .getNewTable() + .getUnresolvedSchema() + .getPrimaryKey() + .isPresent()) Review Comment: ```suggestion isNotPresent() ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java: ########## @@ -1474,25 +1470,29 @@ public void testFailedToAlterTableDropConstraint() throws Exception { @Test public void testAlterTableDropConstraint() throws Exception { prepareNonManagedTable(true); - String expectedSummaryString = - "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n" - + " `a` INT NOT NULL,\n" - + " `b` BIGINT NOT NULL,\n" - + " `c` STRING NOT NULL COMMENT 'column comment',\n" - + " `d` AS [a*(b+2 + a*b)],\n" - + " `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n" - + " `f` AS [e.f1 + e.f2.f0],\n" - + " `g` METADATA VIRTUAL,\n" - + " `ts` TIMESTAMP(3) COMMENT 'just a comment'\n" - + ")"; + String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n DROP CONSTRAINT ct1"; Operation operation = parse("alter table tb1 drop constraint ct1"); - assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); + assertThat( + ((AlterTableChangeOperation) operation) + .getNewTable() + .getUnresolvedSchema() + .getPrimaryKey() + .isPresent()) + .isEqualTo(false); operation = parse("alter table tb1 drop primary key"); - assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class); + assertThat(operation).isInstanceOf(AlterTableChangeOperation.class); assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); + assertThat( + ((AlterTableChangeOperation) operation) + .getNewTable() + .getUnresolvedSchema() + .getPrimaryKey() + .isPresent()) + .isEqualTo(false); Review Comment: Nit ```suggestion assertThat( ((AlterTableChangeOperation) operation) .getNewTable() .getUnresolvedSchema() .getPrimaryKey() ) .isNotPresent(); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ########## @@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) { } } - // -------------------------------------------------------------------------------------------- + private static class ReferencesManager { + + /** Available columns in the table. */ + private final Set<String> columns; + + /** + * Mappings about the column refers which columns, e.g. column `b` refers to the column `a` + * in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToReferences; + + /** + * Reverse mappings about the column refers which columns, e.g. column `a` has the + * dependency of column `b` in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToDependencies; + + /** Primary keys defined on the table. */ + private final Set<String> primaryKeys; + + /** The name of the column watermark expression depends on. */ + private final Set<String> watermarkReferences; + + /** The name of the column partition keys contains. */ + private final Set<String> partitionKeys; + + private ReferencesManager( + Set<String> columns, + Map<String, Set<String>> columnToReferences, + Map<String, Set<String>> columnToDependencies, + Set<String> primaryKeys, + Set<String> watermarkReferences, + Set<String> partitionKeys) { + this.columns = columns; + this.columnToReferences = columnToReferences; + this.columnToDependencies = columnToDependencies; + this.primaryKeys = primaryKeys; + this.watermarkReferences = watermarkReferences; + this.partitionKeys = partitionKeys; + } + + static ReferencesManager create(ResolvedCatalogTable catalogTable) { + Map<String, Set<String>> columnToReferences = new HashMap<>(); + Map<String, Set<String>> columnToDependencies = new HashMap<>(); + catalogTable.getResolvedSchema().getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + column.getName(), catalogTable.getResolvedSchema()); + for (String referencedColumn : referencedColumns) { + columnToReferences + .computeIfAbsent( + referencedColumn, key -> new HashSet<>()) + .add(column.getName()); + columnToDependencies + .computeIfAbsent( + column.getName(), key -> new HashSet<>()) + .add(referencedColumn); + } + }); - private void validateColumnName( - String oldColumnName, - String newColumnName, - ResolvedSchema oldSchema, - List<String> partitionKeys) { - validateColumnName( - oldColumnName, - oldSchema, - partitionKeys, - // fail the operation of renaming column, once the column derives a computed column - (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName)); - // validate new column - if (oldSchema.getColumn(newColumnName).isPresent()) { - throw new ValidationException( - String.format( - "%sThe column `%s` already existed in table schema.", - EX_MSG_PREFIX, newColumnName)); + return new ReferencesManager( + new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()), + columnToReferences, + columnToDependencies, + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .map(constraint -> new HashSet<>(constraint.getColumns())) + .orElse(new HashSet<>()), + ColumnReferenceFinder.findWatermarkReferencedColumn( + catalogTable.getResolvedSchema()), + new HashSet<>(catalogTable.getPartitionKeys())); + } + + void dropColumn(String columnName) { + checkReferences(columnName); + if (primaryKeys.contains(columnName)) { + throw new ValidationException( + String.format( + "%sThe column %s is used as the primary key.", + EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName))); + } + + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> + columnToReferences.get(referredColumn).remove(columnName)); + columnToDependencies.remove(columnName); + columns.remove(columnName); } - } - private void validateColumnName( - String columnToDrop, - ResolvedSchema oldSchema, - List<String> partitionKeys, - Set<String> columnsToDrop) { - validateColumnName( - columnToDrop, - oldSchema, - partitionKeys, - // fail the operation of dropping column, only if the column derives a computed - // column, and the computed column is not being dropped along with the old column - (referencedColumn, computedColumn) -> - referencedColumn.contains(columnToDrop) - && !columnsToDrop.contains(computedColumn.getName())); - oldSchema - .getPrimaryKey() - .ifPresent( - pk -> { - if (pk.getColumns().contains(columnToDrop)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the primary key.", - EX_MSG_PREFIX, columnToDrop)); - } - }); - } + void renameColumn(String columnName, String newName) { + checkReferences(columnName); + if (columns.contains(newName)) { + throw new ValidationException( + String.format( + "%sThe column `%s` already existed in table schema.", + EX_MSG_PREFIX, newName)); + } - private void validateColumnName( - String columnToAlter, - ResolvedSchema oldSchema, - List<String> partitionKeys, - BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) { - // validate old column - Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames()); - if (!tableColumns.contains(columnToAlter)) { - throw new ValidationException( - String.format( - "%sThe column `%s` does not exist in the base table.", - EX_MSG_PREFIX, columnToAlter)); + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> { + columnToReferences.get(referredColumn).remove(columnName); + columnToReferences.get(referredColumn).add(newName); + }); + columnToDependencies.put(newName, columnToDependencies.remove(columnName)); + + columns.remove(columnName); + columns.add(newName); + + primaryKeys.remove(columnName); Review Comment: Btw, I don't think we need to update `primaryKeys` here because `buildUpdatedPrimaryKey` will compute the primary key name. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java: ########## @@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) { } } - // -------------------------------------------------------------------------------------------- + private static class ReferencesManager { + + /** Available columns in the table. */ + private final Set<String> columns; + + /** + * Mappings about the column refers which columns, e.g. column `b` refers to the column `a` + * in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToReferences; + + /** + * Reverse mappings about the column refers which columns, e.g. column `a` has the + * dependency of column `b` in the expression "b as a+1". + */ + private final Map<String, Set<String>> columnToDependencies; + + /** Primary keys defined on the table. */ + private final Set<String> primaryKeys; + + /** The name of the column watermark expression depends on. */ + private final Set<String> watermarkReferences; + + /** The name of the column partition keys contains. */ + private final Set<String> partitionKeys; + + private ReferencesManager( + Set<String> columns, + Map<String, Set<String>> columnToReferences, + Map<String, Set<String>> columnToDependencies, + Set<String> primaryKeys, + Set<String> watermarkReferences, + Set<String> partitionKeys) { + this.columns = columns; + this.columnToReferences = columnToReferences; + this.columnToDependencies = columnToDependencies; + this.primaryKeys = primaryKeys; + this.watermarkReferences = watermarkReferences; + this.partitionKeys = partitionKeys; + } + + static ReferencesManager create(ResolvedCatalogTable catalogTable) { + Map<String, Set<String>> columnToReferences = new HashMap<>(); + Map<String, Set<String>> columnToDependencies = new HashMap<>(); + catalogTable.getResolvedSchema().getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + column.getName(), catalogTable.getResolvedSchema()); + for (String referencedColumn : referencedColumns) { + columnToReferences + .computeIfAbsent( + referencedColumn, key -> new HashSet<>()) + .add(column.getName()); + columnToDependencies + .computeIfAbsent( + column.getName(), key -> new HashSet<>()) + .add(referencedColumn); + } + }); - private void validateColumnName( - String oldColumnName, - String newColumnName, - ResolvedSchema oldSchema, - List<String> partitionKeys) { - validateColumnName( - oldColumnName, - oldSchema, - partitionKeys, - // fail the operation of renaming column, once the column derives a computed column - (referencedColumn, computedColumn) -> referencedColumn.contains(oldColumnName)); - // validate new column - if (oldSchema.getColumn(newColumnName).isPresent()) { - throw new ValidationException( - String.format( - "%sThe column `%s` already existed in table schema.", - EX_MSG_PREFIX, newColumnName)); + return new ReferencesManager( + new HashSet<>(catalogTable.getResolvedSchema().getColumnNames()), + columnToReferences, + columnToDependencies, + catalogTable + .getResolvedSchema() + .getPrimaryKey() + .map(constraint -> new HashSet<>(constraint.getColumns())) + .orElse(new HashSet<>()), + ColumnReferenceFinder.findWatermarkReferencedColumn( + catalogTable.getResolvedSchema()), + new HashSet<>(catalogTable.getPartitionKeys())); + } + + void dropColumn(String columnName) { + checkReferences(columnName); + if (primaryKeys.contains(columnName)) { + throw new ValidationException( + String.format( + "%sThe column %s is used as the primary key.", + EX_MSG_PREFIX, EncodingUtils.escapeIdentifier(columnName))); + } + + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> + columnToReferences.get(referredColumn).remove(columnName)); + columnToDependencies.remove(columnName); + columns.remove(columnName); } - } - private void validateColumnName( - String columnToDrop, - ResolvedSchema oldSchema, - List<String> partitionKeys, - Set<String> columnsToDrop) { - validateColumnName( - columnToDrop, - oldSchema, - partitionKeys, - // fail the operation of dropping column, only if the column derives a computed - // column, and the computed column is not being dropped along with the old column - (referencedColumn, computedColumn) -> - referencedColumn.contains(columnToDrop) - && !columnsToDrop.contains(computedColumn.getName())); - oldSchema - .getPrimaryKey() - .ifPresent( - pk -> { - if (pk.getColumns().contains(columnToDrop)) { - throw new ValidationException( - String.format( - "%sThe column `%s` is used as the primary key.", - EX_MSG_PREFIX, columnToDrop)); - } - }); - } + void renameColumn(String columnName, String newName) { + checkReferences(columnName); + if (columns.contains(newName)) { + throw new ValidationException( + String.format( + "%sThe column `%s` already existed in table schema.", + EX_MSG_PREFIX, newName)); + } - private void validateColumnName( - String columnToAlter, - ResolvedSchema oldSchema, - List<String> partitionKeys, - BiFunction<Set<String>, Column.ComputedColumn, Boolean> computedColumnChecker) { - // validate old column - Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames()); - if (!tableColumns.contains(columnToAlter)) { - throw new ValidationException( - String.format( - "%sThe column `%s` does not exist in the base table.", - EX_MSG_PREFIX, columnToAlter)); + columnToDependencies + .getOrDefault(columnName, Collections.emptySet()) + .forEach( + referredColumn -> { + columnToReferences.get(referredColumn).remove(columnName); + columnToReferences.get(referredColumn).add(newName); + }); + columnToDependencies.put(newName, columnToDependencies.remove(columnName)); + + columns.remove(columnName); + columns.add(newName); Review Comment: Do we need to update `columnToDependencies` and `columnToReferences`? `ALTER TABLE RENAME` is not a bulk operation; there won't exist any further calls to get value from them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org