LadyForest commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r1058243819
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1258,6 +1259,94 @@ public void testAlterTable() throws Exception {
.hasMessageContaining("ALTER TABLE RESET does not support
empty key");
}
+ @Test
+ public void testAlterTableRenameColumn() throws Exception {
+ prepareTable("tb1", false, false, true, 3);
+ // rename pk column c
+ Operation operation = parse("alter table tb1 rename c to c1");
+ assert operation instanceof AlterTableSchemaOperation;
Review Comment:
`assert` can be turned off.
```suggestion
assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
```
##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -330,6 +330,10 @@ void testAlterTable() {
final String sql3 = "alter table t1 drop constraint ct1";
final String expected3 = "ALTER TABLE `T1` DROP CONSTRAINT `CT1`";
sql(sql3).ok(expected3);
+
+ final String sql4 = "alter table t1 rename a to b";
+ final String expected4 = "ALTER TABLE `T1` RENAME `A` TO `B`";
+ sql(sql4).ok(expected4);
Review Comment:
Add a case to cover the compound identifier.
```java
sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X`
TO `A`.`Y`");
```
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -99,6 +107,82 @@ public Schema applySchemaChange(SqlAlterTableSchema
alterTableSchema, Schema ori
return converter.convert();
}
+ public Schema applySchemaChange(
+ SqlAlterTableRenameColumn renameColumn, ContextResolvedTable
originalTable) {
+ String oldColumnName =
getColumnName(renameColumn.getOriginColumnIdentifier());
+ String newColumnName =
getColumnName(renameColumn.getNewColumnIdentifier());
+ List<String> tableColumns =
+ originalTable.getResolvedSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+ // validate old column is exists or new column isn't duplicated or old
column isn't
+ // referenced by computed column
+ validateColumnName(
+ oldColumnName,
+ newColumnName,
+ tableColumns,
+ originalTable.getResolvedSchema(),
+ ((CatalogTable)
originalTable.getResolvedTable()).getPartitionKeys());
+
+ // validate old column isn't referenced by watermark
+ List<WatermarkSpec> watermarkSpecs =
originalTable.getResolvedSchema().getWatermarkSpecs();
+ watermarkSpecs.forEach(
+ watermarkSpec -> {
+ String rowtimeAttribute =
watermarkSpec.getRowtimeAttribute();
+ Set<String> referencedColumns =
+ ColumnReferenceFinder.findReferencedColumn(
+ watermarkSpec.getWatermarkExpression(),
tableColumns);
+ if (oldColumnName.equals(rowtimeAttribute)
+ || referencedColumns.contains(oldColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referred by
watermark expression %s, "
+ + "currently doesn't allow to
rename column which is "
+ + "referred by watermark
expression.",
+ oldColumnName,
watermarkSpec.asSummaryString()));
+ }
+ });
+
+ Schema.Builder builder = Schema.newBuilder();
+ // build column
+ Schema originSchema = originalTable.getTable().getUnresolvedSchema();
+ originSchema
+ .getColumns()
+ .forEach(
+ column -> {
+ if (oldColumnName.equals(column.getName())) {
+ buildNewColumnFromOriginColumn(builder,
column, newColumnName);
Review Comment:
It seems the column comment is omitted.
```suggestion
buildNewColumnFromOriginColumn(builder,
column, newColumnName);
builder.withComment(column.getName());
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -2670,7 +2776,7 @@ private void prepareTable(
Schema.newBuilder()
.column("a", DataTypes.INT().notNull())
.column("b", DataTypes.BIGINT().notNull())
- .column("c", DataTypes.STRING())
+ .column("c", DataTypes.STRING().notNull())
Review Comment:
Why change `c`'s nullability?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -417,6 +495,84 @@ void checkColumnExists(String columnName) {
}
}
+ //
--------------------------------------------------------------------------------------------
+
+ private void validateColumnName(
+ String originColumnName,
+ String newColumnName,
+ List<String> tableColumns,
+ ResolvedSchema originResolvedSchema,
+ List<String> partitionKeys) {
+ // validate old column
+ if (!tableColumns.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s not found in table schema for
RENAME COLUMN",
+ originColumnName));
+ }
+
+ // validate new column
+ if (tableColumns.contains(newColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "New column %s already existed in table schema for
RENAME COLUMN",
+ newColumnName));
+ }
+
+ // validate old column name isn't referred by computed column case
+ originResolvedSchema.getColumns().stream()
+ .filter(column -> column instanceof Column.ComputedColumn)
+ .forEach(
+ column -> {
+ Column.ComputedColumn computedColumn =
(Column.ComputedColumn) column;
+ Set<String> referencedColumn =
+ ColumnReferenceFinder.findReferencedColumn(
+ computedColumn.getExpression(),
tableColumns);
+ if (referencedColumn.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referred by
computed column %s, currently doesn't "
+ + "allow to rename
column which is referred by computed column.",
+ originColumnName,
+
computedColumn.asSummaryString()));
+ }
+ });
+ // validate partition keys doesn't contain the old column
+ if (partitionKeys.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Can not rename column %s because it is used as
the partition keys.",
+ originColumnName));
+ }
+ }
+
+ private void buildNewColumnFromOriginColumn(
+ Schema.Builder builder, Schema.UnresolvedColumn originColumn,
String columnName) {
+ if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+ builder.columnByExpression(
+ columnName, ((Schema.UnresolvedComputedColumn)
originColumn).getExpression());
+ } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+ builder.column(
+ columnName, ((Schema.UnresolvedPhysicalColumn)
originColumn).getDataType());
+ } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+ Schema.UnresolvedMetadataColumn metadataColumn =
+ (Schema.UnresolvedMetadataColumn) originColumn;
+ builder.columnByMetadata(
+ columnName,
+ metadataColumn.getDataType(),
+ metadataColumn.getMetadataKey(),
+ metadataColumn.isVirtual());
+ }
+ }
+
+ private static String getColumnName(SqlIdentifier identifier) {
+ if (!identifier.isSimple()) {
+ throw new UnsupportedOperationException(
+ String.format("Alter nested row type %s is not supported
yet.", identifier));
Review Comment:
Nit: add `EX_MSG_PREFIX` to align exception message type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]