wuchong commented on a change in pull request #15755:
URL: https://github.com/apache/flink/pull/15755#discussion_r620825538



##########
File path: 
flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
##########
@@ -478,6 +481,17 @@ SqlAlterTable SqlAlterTable() :
                         tableIdentifier,
                         newTableIdentifier);
         }
+    |
+        <RENAME>
+            originColumnName = SimpleIdentifier()
+        <TO>
+            newColumnName = SimpleIdentifier()
+        {
+            return new SqlAlterTableRenameColumn(
+                    startPos.plus(getPos()),
+                    tableIdentifier,
+                    originColumnName,newColumnName);

Review comment:
       nit: Please make them in separate lines. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable) {
+
+        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();

Review comment:
       `modifiedTableSchema` sounds like this schema has been modified, would 
be better to call `originSchema`. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable) {
+
+        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+        validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        modifiedTableSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (StringUtils.equals(column.getName(), 
originColumnName)) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key column
+        List<String> originPrimaryKeyNames =
+                modifiedTableSchema
+                        .getPrimaryKey()
+                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
+                        .orElseGet(Lists::newArrayList);
+
+        List<String> newPrimaryKeyNames =
+                originPrimaryKeyNames.stream()
+                        .map(
+                                pkName ->
+                                        StringUtils.equals(pkName, 
originColumnName)
+                                                ? newColumnName
+                                                : pkName)
+                        .collect(Collectors.toList());
+
+        if (newPrimaryKeyNames.size() > 0) {
+            builder.primaryKey(newPrimaryKeyNames);
+        }
+        // build watermark
+        modifiedTableSchema.getWatermarkSpecs().stream()
+                .forEach(
+                        watermarkSpec -> {
+                            String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+                            Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+                            if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+                                String newWatermarkExpression =
+                                        ((SqlCallExpression) 
watermarkExpression)
+                                                .getSqlExpression()
+                                                
.replace(watermarkRefColumnName, newColumnName);

Review comment:
       1. We can't guarantee this is always `SqlCallExpression`. 
   2. We can't use String `replace` for renaming, this is very error-prone, 
e.g. the original expressions is `f123 - f1` and `f1` is rename to `f2`, then 
the replaced result would be `f223 - f2` which is wrong.  
   
   If we don't have a good way to replace column names in expression, we can 
disallow rename for columns referenced in expressions in the first version. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable) {
+
+        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+        validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        modifiedTableSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (StringUtils.equals(column.getName(), 
originColumnName)) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key column
+        List<String> originPrimaryKeyNames =
+                modifiedTableSchema
+                        .getPrimaryKey()
+                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
+                        .orElseGet(Lists::newArrayList);
+
+        List<String> newPrimaryKeyNames =
+                originPrimaryKeyNames.stream()
+                        .map(
+                                pkName ->
+                                        StringUtils.equals(pkName, 
originColumnName)
+                                                ? newColumnName
+                                                : pkName)
+                        .collect(Collectors.toList());
+
+        if (newPrimaryKeyNames.size() > 0) {
+            builder.primaryKey(newPrimaryKeyNames);
+        }
+        // build watermark
+        modifiedTableSchema.getWatermarkSpecs().stream()
+                .forEach(
+                        watermarkSpec -> {
+                            String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+                            Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+                            if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+                                String newWatermarkExpression =
+                                        ((SqlCallExpression) 
watermarkExpression)
+                                                .getSqlExpression()
+                                                
.replace(watermarkRefColumnName, newColumnName);
+                                builder.watermark(newColumnName, 
newWatermarkExpression);
+                            } else {
+                                builder.watermark(watermarkRefColumnName, 
watermarkExpression);
+                            }
+                        });
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(
+                                name ->
+                                        StringUtils.equals(name, 
originColumnName)
+                                                ? newColumnName
+                                                : name)
+                        .collect(Collectors.toList());
+        // generate new schema
+        Schema newSchema = builder.build();
+
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        newSchema,
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName, String newColumnName, Schema 
modifiedTableSchema) {
+        List<String> columnNames =
+                modifiedTableSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        int originColumnIndex = columnNames.indexOf(originColumnName);
+        if (originColumnIndex < 0) {
+            throw new ValidationException(
+                    String.format("Old column %s not found for RENAME COLUMN 
", originColumnName));
+        }
+
+        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
+        if (sameColumnNameIndex >= 0) {
+            throw new ValidationException(
+                    String.format("New column %s existed for RENAME COLUMN ", 
newColumnName));
+        }
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) 
originColumn).getDataType());
+        } else {
+            builder.columnByMetadata(
+                    columnName, ((Schema.UnresolvedMetadataColumn) 
originColumn).getDataType());

Review comment:
       This loses optional metadata key and `isVirtual`. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable) {
+
+        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+        validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        modifiedTableSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (StringUtils.equals(column.getName(), 
originColumnName)) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key column
+        List<String> originPrimaryKeyNames =
+                modifiedTableSchema
+                        .getPrimaryKey()
+                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
+                        .orElseGet(Lists::newArrayList);
+
+        List<String> newPrimaryKeyNames =
+                originPrimaryKeyNames.stream()
+                        .map(
+                                pkName ->
+                                        StringUtils.equals(pkName, 
originColumnName)
+                                                ? newColumnName
+                                                : pkName)
+                        .collect(Collectors.toList());
+
+        if (newPrimaryKeyNames.size() > 0) {
+            builder.primaryKey(newPrimaryKeyNames);

Review comment:
       This loses `constraintName`.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -144,6 +149,116 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable) {
+
+        Schema modifiedTableSchema = catalogTable.getUnresolvedSchema();
+        validateColumnName(originColumnName, newColumnName, 
modifiedTableSchema);
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        modifiedTableSchema.getColumns().stream()
+                .forEach(
+                        column -> {
+                            if (StringUtils.equals(column.getName(), 
originColumnName)) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key column
+        List<String> originPrimaryKeyNames =
+                modifiedTableSchema
+                        .getPrimaryKey()
+                        .map(Schema.UnresolvedPrimaryKey::getColumnNames)
+                        .orElseGet(Lists::newArrayList);
+
+        List<String> newPrimaryKeyNames =
+                originPrimaryKeyNames.stream()
+                        .map(
+                                pkName ->
+                                        StringUtils.equals(pkName, 
originColumnName)
+                                                ? newColumnName
+                                                : pkName)
+                        .collect(Collectors.toList());
+
+        if (newPrimaryKeyNames.size() > 0) {
+            builder.primaryKey(newPrimaryKeyNames);
+        }
+        // build watermark
+        modifiedTableSchema.getWatermarkSpecs().stream()
+                .forEach(
+                        watermarkSpec -> {
+                            String watermarkRefColumnName = 
watermarkSpec.getColumnName();
+                            Expression watermarkExpression = 
watermarkSpec.getWatermarkExpression();
+                            if (StringUtils.equals(watermarkRefColumnName, 
originColumnName)) {
+                                String newWatermarkExpression =
+                                        ((SqlCallExpression) 
watermarkExpression)
+                                                .getSqlExpression()
+                                                
.replace(watermarkRefColumnName, newColumnName);
+                                builder.watermark(newColumnName, 
newWatermarkExpression);
+                            } else {
+                                builder.watermark(watermarkRefColumnName, 
watermarkExpression);
+                            }
+                        });
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(
+                                name ->
+                                        StringUtils.equals(name, 
originColumnName)
+                                                ? newColumnName
+                                                : name)
+                        .collect(Collectors.toList());
+        // generate new schema
+        Schema newSchema = builder.build();
+
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        newSchema,
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName, String newColumnName, Schema 
modifiedTableSchema) {
+        List<String> columnNames =
+                modifiedTableSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+
+        int originColumnIndex = columnNames.indexOf(originColumnName);
+        if (originColumnIndex < 0) {
+            throw new ValidationException(
+                    String.format("Old column %s not found for RENAME COLUMN 
", originColumnName));
+        }
+
+        int sameColumnNameIndex = columnNames.indexOf(newColumnName);
+        if (sameColumnNameIndex >= 0) {
+            throw new ValidationException(
+                    String.format("New column %s existed for RENAME COLUMN ", 
newColumnName));
+        }
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());

Review comment:
       Computed expression may also contain field reference which is renamed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to