lsyldliu commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r845772580


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,146 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> columnNames =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old 
column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, columnNames, 
originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = 
watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), 
columnNames);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by 
watermark expression %s, "
+                                                + "currently doesn't allow to 
rename column which is "
+                                                + "referenced by watermark 
expression.",
+                                        originColumnName, 
watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (originColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = 
originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = 
originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(originColumnName) ? 
newColumnName : pkName)
+                            .collect(Collectors.toList());
+
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema.getWatermarkSpecs().stream()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        
watermarkSpec.getWatermarkExpression()));
+
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(name -> name.equals(originColumnName) ? 
newColumnName : name)
+                        .collect(Collectors.toList());
+
+        // generate new schema
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> columnNames,
+            ResolvedSchema originResolvedSchema) {
+        int originColumnIndex = columnNames.indexOf(originColumnName);
+        if (originColumnIndex < 0) {
+            throw new ValidationException(
+                    String.format("Old column %s not found for RENAME COLUMN 
", originColumnName));
+        }
+
+        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
+        if (sameColumnNameIndex >= 0) {
+            throw new ValidationException(
+                    String.format("New column %s existed for RENAME COLUMN ", 
newColumnName));
+        }
+
+        // validate old column name is referenced by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = 
(Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), 
columnNames);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referenced 
by computed column %s, currently doesn't "
+                                                        + "allow to rename 
column which is referenced by computed column.",
+                                                originColumnName,
+                                                
computedColumn.asSummaryString()));
+                            }
+                        });
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) 
originColumn).getDataType());
+        } else {

Review Comment:
   good catch, you are right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to