LadyForest commented on code in PR #21592:
URL: https://github.com/apache/flink/pull/21592#discussion_r1060581278


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -229,6 +229,53 @@ static ModifyWatermark modify(WatermarkSpec 
newWatermarkSpec) {
         return new ModifyWatermark(newWatermarkSpec);
     }
 
+    /**
+     * A table change to drop column.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP COLUMN &lt;column_name&gt;
+     * </pre>
+     *
+     * @param columnName the column to drop.
+     * @return a TableChange represents the modification.
+     */
+    static DropColumn dropColumn(String columnName) {
+        return new DropColumn(columnName);
+    }
+
+    /**
+     * A table change to drop watermark.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP WATERMARK
+     * </pre>
+     *
+     * @return a TableChange represents the modification.
+     */
+    static DropWatermark dropWatermark() {
+        return DropWatermark.INSTANCE;
+    }
+
+    /**
+     * A table change to drop constraint.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER TABLE &lt;table_name&gt; DROP CONSTRAINT 
&lt;constraint_name&gt;
+     * </pre>
+     *
+     * @param constraintName the constraint to drop.
+     * @return a TableChange represents the modification.
+     */
+    static DropUniqueConstraint dropConstraint(String constraintName) {

Review Comment:
   AFAIK Flink does not support unique constraints, thus`UniqueConstraint` 
sounds slightly confusing. What about `DropConstraint`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/ColumnReferenceFinder.java:
##########
@@ -81,7 +82,11 @@ public static Set<String> findReferencedColumn(String 
columnName, ResolvedSchema
     public static Set<String> findWatermarkReferencedColumn(ResolvedSchema 
schema) {
         ColumnReferenceVisitor visitor = new 
ColumnReferenceVisitor(schema.getColumnNames());
         return schema.getWatermarkSpecs().stream()
-                .flatMap(spec -> 
visitor.visit(spec.getWatermarkExpression()).stream())
+                .flatMap(
+                        spec ->
+                                Stream.concat(
+                                        
visitor.visit(spec.getWatermarkExpression()).stream(),
+                                        Stream.of(spec.getRowtimeAttribute())))

Review Comment:
   I'm curious under what condition the watermark expression does not contain 
the rowtime field?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -133,6 +132,14 @@ public static Operation convertAddReplaceColumns(
                         newProperties,
                         catalogTable.getComment());
         if (addReplaceColumns.isReplace()) {
+            // It's hard to determine how to decompose the ALTER TABLE REPLACE 
into multiple
+            // TableChanges. For example, with old schema <a INT, b INT, c 
INT> and the new schema
+            // <a INT, d INT>, there are multiple choices:
+            // plan 1: DROP COLUMN c, RENAME COLUMN b TO d;
+            // plan 2: DROP COLUMN b, RENAME COLUMN c TO d;
+            // So we don't translate with TableChanges here. One solution to 
solve this problem is
+            // the minimum edit distance, which tries to minimize the 
modification times, but it
+            // also can not give a unique answer.

Review Comment:
   How about
   ```java
               // It's hard to define how to convert the ALTER TABLE REPLACE 
into multiple
               // TableChanges. For example, with old schema <a INT, b INT, c 
INT> and the new schema
               // <a INT, d INT>, there are multiple alternatives:
               // plan 1: DROP COLUMN c, RENAME COLUMN b TO d;
               // plan 2: DROP COLUMN b, RENAME COLUMN c TO d;
               // So we don't translate with TableChanges here. One workaround 
is
               // to minimize the edit distance, i.e., minimize the 
modification times, but it
               // still cannot provide a deterministic answer.
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {
+            checkReferences(columnName);
+            if (columns.contains(newName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` already existed in table 
schema.",
+                                EX_MSG_PREFIX, newName));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> 
computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base 
table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn -> {
+                                
columnToReferences.get(referredColumn).remove(columnName);
+                                
columnToReferences.get(referredColumn).add(newName);
+                            });
+            columnToDependencies.put(newName, 
columnToDependencies.remove(columnName));
+
+            columns.remove(columnName);
+            columns.add(newName);
+
+            primaryKeys.remove(columnName);
+            primaryKeys.add(newName);
         }
 
-        // validate old column name isn't referred by computed column case
-        oldSchema.getColumns().stream()
-                .filter(column -> column instanceof Column.ComputedColumn)
-                .forEach(
-                        column -> {
-                            Column.ComputedColumn computedColumn = 
(Column.ComputedColumn) column;
-                            Set<String> referencedColumn =
-                                    ColumnReferenceFinder.findReferencedColumn(
-                                            computedColumn.getName(), 
oldSchema);
-                            if (computedColumnChecker.apply(referencedColumn, 
computedColumn)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is 
referenced by computed column %s.",
-                                                EX_MSG_PREFIX,
-                                                columnToAlter,
-                                                
computedColumn.asSummaryString()));
-                            }
-                        });
-        // validate partition keys doesn't contain the old column
-        if (partitionKeys.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` is used as the partition keys.",
-                            EX_MSG_PREFIX, columnToAlter));
+        Set<String> lookupColumnDependencies(String columnName) {
+            return columnToDependencies.getOrDefault(columnName, 
Collections.emptySet());
         }

Review Comment:
   Nit: what about
   ```suggestion
           int getColumnDependencyCount(String columnName) {
               return columnToDependencies.getOrDefault(columnName, 
Collections.emptySet()).size();
           }
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {

Review Comment:
   Nit: `columnName` -> `oldColumnName`?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1507,19 +1507,15 @@ public void testFailedToAlterTableDropWatermark() 
throws Exception {
     public void testAlterTableDropWatermark() throws Exception {
         prepareNonManagedTable("tb1", true);
         Operation operation = parse("alter table tb1 drop watermark");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                                + "  `a` INT NOT NULL,\n"
-                                + "  `b` BIGINT NOT NULL,\n"
-                                + "  `c` STRING NOT NULL COMMENT 'column 
comment',\n"
-                                + "  `d` AS [a*(b+2 + a*b)],\n"
-                                + "  `e` ROW<`f0` STRING, `f1` INT, `f2` 
ROW<`f0` DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                                + "  `f` AS [e.f1 + e.f2.f0],\n"
-                                + "  `g` METADATA VIRTUAL,\n"
-                                + "  `ts` TIMESTAMP(3) COMMENT 'just a 
comment'\n"
-                                + ")");
+                .isEqualTo("ALTER TABLE cat1.db1.tb1\n  DROP WATERMARK");
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getWatermarkSpecs())
+                .isEqualTo(Collections.emptyList());

Review Comment:
   ```suggestion
                   .isEmpty();
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {
+            checkReferences(columnName);
+            if (columns.contains(newName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` already existed in table 
schema.",
+                                EX_MSG_PREFIX, newName));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> 
computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base 
table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn -> {
+                                
columnToReferences.get(referredColumn).remove(columnName);
+                                
columnToReferences.get(referredColumn).add(newName);
+                            });
+            columnToDependencies.put(newName, 
columnToDependencies.remove(columnName));
+
+            columns.remove(columnName);
+            columns.add(newName);
+
+            primaryKeys.remove(columnName);

Review Comment:
   I don't think we can directly add `newColumnName` to `primaryKeys`.
   
   ```suggestion
   if (primaryKeys.remove(columnName)) {
       primaryKeys.add(newName);
   }
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1474,25 +1470,29 @@ public void testFailedToAlterTableDropConstraint() 
throws Exception {
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
-        String expectedSummaryString =
-                "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                        + "  `a` INT NOT NULL,\n"
-                        + "  `b` BIGINT NOT NULL,\n"
-                        + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                        + "  `d` AS [a*(b+2 + a*b)],\n"
-                        + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` 
DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                        + "  `f` AS [e.f1 + e.f2.f0],\n"
-                        + "  `g` METADATA VIRTUAL,\n"
-                        + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                        + ")";
+        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP 
CONSTRAINT ct1";
 
         Operation operation = parse("alter table tb1 drop constraint ct1");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getPrimaryKey()
+                                .isPresent())

Review Comment:
   ```suggestion
                                   isNotPresent()
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1474,25 +1470,29 @@ public void testFailedToAlterTableDropConstraint() 
throws Exception {
     @Test
     public void testAlterTableDropConstraint() throws Exception {
         prepareNonManagedTable(true);
-        String expectedSummaryString =
-                "ALTER TABLE cat1.db1.tb1 SET SCHEMA (\n"
-                        + "  `a` INT NOT NULL,\n"
-                        + "  `b` BIGINT NOT NULL,\n"
-                        + "  `c` STRING NOT NULL COMMENT 'column comment',\n"
-                        + "  `d` AS [a*(b+2 + a*b)],\n"
-                        + "  `e` ROW<`f0` STRING, `f1` INT, `f2` ROW<`f0` 
DOUBLE, `f1` ARRAY<FLOAT>>>,\n"
-                        + "  `f` AS [e.f1 + e.f2.f0],\n"
-                        + "  `g` METADATA VIRTUAL,\n"
-                        + "  `ts` TIMESTAMP(3) COMMENT 'just a comment'\n"
-                        + ")";
+        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP 
CONSTRAINT ct1";
 
         Operation operation = parse("alter table tb1 drop constraint ct1");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getPrimaryKey()
+                                .isPresent())
+                .isEqualTo(false);
 
         operation = parse("alter table tb1 drop primary key");
-        assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
+        assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
+        assertThat(
+                        ((AlterTableChangeOperation) operation)
+                                .getNewTable()
+                                .getUnresolvedSchema()
+                                .getPrimaryKey()
+                                .isPresent())
+                .isEqualTo(false);

Review Comment:
   Nit
   ```suggestion
   assertThat(
                           ((AlterTableChangeOperation) operation)
                                   .getNewTable()
                                   .getUnresolvedSchema()
                                   .getPrimaryKey()
                                   )
                   .isNotPresent();
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {
+            checkReferences(columnName);
+            if (columns.contains(newName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` already existed in table 
schema.",
+                                EX_MSG_PREFIX, newName));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> 
computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base 
table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn -> {
+                                
columnToReferences.get(referredColumn).remove(columnName);
+                                
columnToReferences.get(referredColumn).add(newName);
+                            });
+            columnToDependencies.put(newName, 
columnToDependencies.remove(columnName));
+
+            columns.remove(columnName);
+            columns.add(newName);
+
+            primaryKeys.remove(columnName);

Review Comment:
   Btw, I don't think we need to update `primaryKeys` here because 
`buildUpdatedPrimaryKey` will compute the primary key name.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {
+            checkReferences(columnName);
+            if (columns.contains(newName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` already existed in table 
schema.",
+                                EX_MSG_PREFIX, newName));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> 
computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base 
table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn -> {
+                                
columnToReferences.get(referredColumn).remove(columnName);
+                                
columnToReferences.get(referredColumn).add(newName);
+                            });
+            columnToDependencies.put(newName, 
columnToDependencies.remove(columnName));
+
+            columns.remove(columnName);
+            columns.add(newName);

Review Comment:
   Do we need to update `columnToDependencies` and `columnToReferences`? 
   `ALTER TABLE RENAME` is not a bulk operation; there won't exist any further 
calls to get value from them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to