fsk119 commented on code in PR #21592:
URL: https://github.com/apache/flink/pull/21592#discussion_r1061088996


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -728,114 +724,166 @@ String getComment(SqlTableColumn column) {
         }
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class ReferencesManager {
+
+        /** Available columns in the table. */
+        private final Set<String> columns;
+
+        /**
+         * Mappings about the column refers which columns, e.g. column `b` 
refers to the column `a`
+         * in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToReferences;
+
+        /**
+         * Reverse mappings about the column refers which columns, e.g. column 
`a` has the
+         * dependency of column `b` in the expression "b as a+1".
+         */
+        private final Map<String, Set<String>> columnToDependencies;
+
+        /** Primary keys defined on the table. */
+        private final Set<String> primaryKeys;
+
+        /** The name of the column watermark expression depends on. */
+        private final Set<String> watermarkReferences;
+
+        /** The name of the column partition keys contains. */
+        private final Set<String> partitionKeys;
+
+        private ReferencesManager(
+                Set<String> columns,
+                Map<String, Set<String>> columnToReferences,
+                Map<String, Set<String>> columnToDependencies,
+                Set<String> primaryKeys,
+                Set<String> watermarkReferences,
+                Set<String> partitionKeys) {
+            this.columns = columns;
+            this.columnToReferences = columnToReferences;
+            this.columnToDependencies = columnToDependencies;
+            this.primaryKeys = primaryKeys;
+            this.watermarkReferences = watermarkReferences;
+            this.partitionKeys = partitionKeys;
+        }
+
+        static ReferencesManager create(ResolvedCatalogTable catalogTable) {
+            Map<String, Set<String>> columnToReferences = new HashMap<>();
+            Map<String, Set<String>> columnToDependencies = new HashMap<>();
+            catalogTable.getResolvedSchema().getColumns().stream()
+                    .filter(column -> column instanceof Column.ComputedColumn)
+                    .forEach(
+                            column -> {
+                                Set<String> referencedColumns =
+                                        
ColumnReferenceFinder.findReferencedColumn(
+                                                column.getName(), 
catalogTable.getResolvedSchema());
+                                for (String referencedColumn : 
referencedColumns) {
+                                    columnToReferences
+                                            .computeIfAbsent(
+                                                    referencedColumn, key -> 
new HashSet<>())
+                                            .add(column.getName());
+                                    columnToDependencies
+                                            .computeIfAbsent(
+                                                    column.getName(), key -> 
new HashSet<>())
+                                            .add(referencedColumn);
+                                }
+                            });
 
-    private void validateColumnName(
-            String oldColumnName,
-            String newColumnName,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys) {
-        validateColumnName(
-                oldColumnName,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of renaming column, once the column 
derives a computed column
-                (referencedColumn, computedColumn) -> 
referencedColumn.contains(oldColumnName));
-        // validate new column
-        if (oldSchema.getColumn(newColumnName).isPresent()) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` already existed in table 
schema.",
-                            EX_MSG_PREFIX, newColumnName));
+            return new ReferencesManager(
+                    new 
HashSet<>(catalogTable.getResolvedSchema().getColumnNames()),
+                    columnToReferences,
+                    columnToDependencies,
+                    catalogTable
+                            .getResolvedSchema()
+                            .getPrimaryKey()
+                            .map(constraint -> new 
HashSet<>(constraint.getColumns()))
+                            .orElse(new HashSet<>()),
+                    ColumnReferenceFinder.findWatermarkReferencedColumn(
+                            catalogTable.getResolvedSchema()),
+                    new HashSet<>(catalogTable.getPartitionKeys()));
+        }
+
+        void dropColumn(String columnName) {
+            checkReferences(columnName);
+            if (primaryKeys.contains(columnName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column %s is used as the primary key.",
+                                EX_MSG_PREFIX, 
EncodingUtils.escapeIdentifier(columnName)));
+            }
+
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn ->
+                                    
columnToReferences.get(referredColumn).remove(columnName));
+            columnToDependencies.remove(columnName);
+            columns.remove(columnName);
         }
-    }
 
-    private void validateColumnName(
-            String columnToDrop,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            Set<String> columnsToDrop) {
-        validateColumnName(
-                columnToDrop,
-                oldSchema,
-                partitionKeys,
-                // fail the operation of dropping column, only if the column 
derives a computed
-                // column, and the computed column is not being dropped along 
with the old column
-                (referencedColumn, computedColumn) ->
-                        referencedColumn.contains(columnToDrop)
-                                && 
!columnsToDrop.contains(computedColumn.getName()));
-        oldSchema
-                .getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            if (pk.getColumns().contains(columnToDrop)) {
-                                throw new ValidationException(
-                                        String.format(
-                                                "%sThe column `%s` is used as 
the primary key.",
-                                                EX_MSG_PREFIX, columnToDrop));
-                            }
-                        });
-    }
+        void renameColumn(String columnName, String newName) {
+            checkReferences(columnName);
+            if (columns.contains(newName)) {
+                throw new ValidationException(
+                        String.format(
+                                "%sThe column `%s` already existed in table 
schema.",
+                                EX_MSG_PREFIX, newName));
+            }
 
-    private void validateColumnName(
-            String columnToAlter,
-            ResolvedSchema oldSchema,
-            List<String> partitionKeys,
-            BiFunction<Set<String>, Column.ComputedColumn, Boolean> 
computedColumnChecker) {
-        // validate old column
-        Set<String> tableColumns = new HashSet<>(oldSchema.getColumnNames());
-        if (!tableColumns.contains(columnToAlter)) {
-            throw new ValidationException(
-                    String.format(
-                            "%sThe column `%s` does not exist in the base 
table.",
-                            EX_MSG_PREFIX, columnToAlter));
+            columnToDependencies
+                    .getOrDefault(columnName, Collections.emptySet())
+                    .forEach(
+                            referredColumn -> {
+                                
columnToReferences.get(referredColumn).remove(columnName);
+                                
columnToReferences.get(referredColumn).add(newName);
+                            });
+            columnToDependencies.put(newName, 
columnToDependencies.remove(columnName));
+
+            columns.remove(columnName);
+            columns.add(newName);
+
+            primaryKeys.remove(columnName);

Review Comment:
   OK. I remove this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to