luoyuxia commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r847279810


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,150 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> tableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old 
column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, tableColumns, 
originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = 
watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), 
tableColumns);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by 
watermark expression %s, "
+                                                + "currently doesn't allow to 
rename column which is "
+                                                + "referenced by watermark 
expression.",
+                                        originColumnName, 
watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            if (originColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = 
originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = 
originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(originColumnName) ? 
newColumnName : pkName)
+                            .collect(Collectors.toList());
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        
watermarkSpec.getWatermarkExpression()));
+
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(name -> name.equals(originColumnName) ? 
newColumnName : name)
+                        .collect(Collectors.toList());
+
+        // generate new schema
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> tableColumns,
+            ResolvedSchema originResolvedSchema) {
+        // validate old column
+        if (!tableColumns.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Old column %s not found in table schema for 
RENAME COLUMN",
+                            originColumnName));
+        }
+
+        // validate new column
+        if (tableColumns.contains(newColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "New column %s already existed in table schema for 
RENAME COLUMN",
+                            newColumnName));
+        }
+
+        // validate old column name is referenced by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = 
(Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), 
tableColumns);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referenced 
by computed column %s, currently doesn't "
+                                                        + "allow to rename 
column which is referenced by computed column.",
+                                                originColumnName,
+                                                
computedColumn.asSummaryString()));
+                            }
+                        });
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) 
originColumn).getDataType());
+        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+            Schema.UnresolvedMetadataColumn metadataColumn =
+                    (Schema.UnresolvedMetadataColumn) originColumn;
+            builder.columnByMetadata(
+                    columnName,
+                    metadataColumn.getDataType(),
+                    metadataColumn.getMetadataKey(),
+                    metadataColumn.isVirtual());
+        }

Review Comment:
   Should we adding an else statement code block to throw exception in here so 
that the caller can know it and do the adaption.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1269,6 +1271,112 @@ public void testAlterTable() throws Exception {
                 .hasMessageContaining("ALTER TABLE RESET does not support 
empty key");
     }
 
+    @Test
+    public void testAlterTableRenameColumn() throws Exception {
+        Catalog catalog = new GenericInMemoryCatalog("default", "default");
+        catalogManager.registerCatalog("cat1", catalog);
+        functionCatalog.registerTempCatalogScalarFunction(
+                ObjectIdentifier.of("cat1", "default", "my_udf1"), 
Func0$.MODULE$);
+
+        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.STRING().notNull())
+                                .column("b", DataTypes.INT().notNull())
+                                .column("c", DataTypes.INT())
+                                .column("d", DataTypes.INT())
+                                .column("e", DataTypes.STRING())
+                                .column("f", DataTypes.TIMESTAMP(3).notNull())
+                                .columnByExpression("h", "c - 1")
+                                .columnByExpression("i", 
"cat1.`default`.my_udf1(d) + 1")
+                                .columnByMetadata("m", DataTypes.INT(), true)
+                                .columnByExpression("g", "TO_TIMESTAMP(e)")
+                                .watermark("f", "g - INTERVAL '5' SECOND")
+                                .primaryKey("a", "b")
+                                .build(),
+                        "tb1",
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+        catalogManager.setCurrentCatalog("cat1");
+        catalogManager.setCurrentDatabase("db1");
+        catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
+        // Test alter table rename
+        // rename column b
+        Operation operation = parse("alter table tb1 rename b to b1", 
SqlDialect.DEFAULT);
+        assert operation instanceof AlterTableSchemaOperation;
+        Schema actual =
+                ((AlterTableSchemaOperation) 
operation).getCatalogTable().getUnresolvedSchema();
+        Schema expected =
+                Schema.newBuilder()
+                        .column("a", DataTypes.STRING().notNull())
+                        .column("b1", DataTypes.INT().notNull())
+                        .column("c", DataTypes.INT())
+                        .column("d", DataTypes.INT())
+                        .column("e", DataTypes.STRING())
+                        .column("f", DataTypes.TIMESTAMP(3).notNull())
+                        .columnByExpression("h", "c - 1")
+                        .columnByExpression("i", "cat1.`default`.my_udf1(d) + 
1")
+                        .columnByMetadata("m", DataTypes.INT(), true)
+                        .columnByExpression("g", "TO_TIMESTAMP(e)")
+                        .watermark("f", "g - INTERVAL '5' SECOND")
+                        .primaryKeyNamed("PK_a_b", "a", "b1")
+                        .build();
+        assertThat(expected).isEqualTo(actual);
+
+        // rename column c test computed column case1
+        assertThatThrownBy(() -> parse("alter table tb1 rename c to c1", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column c is referenced by computed column `h` INT 
AS c - 1, "
+                                + "currently doesn't allow to rename column 
which is referenced by computed column.");
+
+        // rename column d test computed column case2
+        assertThatThrownBy(() -> parse("alter table tb1 rename d to d1", 
SqlDialect.DEFAULT))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Old column d is referenced by computed column `i` INT 
NOT NULL AS cat1.`default`.my_udf1(d) + 1,"
+                                + " currently doesn't allow to rename column 
which is referenced by computed column.");
+
+        // rename column e test computed column case3
+        CatalogTable catalogTable2 =

Review Comment:
   Can we test the computed column case3 with `tb1` so that the code can be 
more readable.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,150 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> tableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old 
column is

Review Comment:
   Nit:
   ```suggestion
           // validate old column is exists or new column isn't duplicated or 
old column is
   ```
   And the following comments with `validate` should also change in same way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to