luoyuxia commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r847279810
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,150 @@ public static Operation convertChangeColumn(
// TODO: handle watermark and constraints
}
+ public static Operation convertRenameColumn(
+ ObjectIdentifier tableIdentifier,
+ String originColumnName,
+ String newColumnName,
+ CatalogTable catalogTable,
+ ResolvedSchema originResolveSchema) {
+ Schema originSchema = catalogTable.getUnresolvedSchema();
+ List<String> tableColumns =
+ originSchema.getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toList());
+ // validate old column is exists or new column is duplicated or old
column is
+ // referenced by computed column
+ validateColumnName(originColumnName, newColumnName, tableColumns,
originResolveSchema);
+
+ // validate old column is referenced by watermark
+ List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+ originResolveSchema.getWatermarkSpecs();
+ watermarkSpecs.forEach(
+ watermarkSpec -> {
+ String rowtimeAttribute =
watermarkSpec.getRowtimeAttribute();
+ Set<String> referencedColumns =
+ ColumnReferenceFinder.findReferencedColumn(
+ watermarkSpec.getWatermarkExpression(),
tableColumns);
+ if (originColumnName.equals(rowtimeAttribute)
+ || referencedColumns.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referenced by
watermark expression %s, "
+ + "currently doesn't allow to
rename column which is "
+ + "referenced by watermark
expression.",
+ originColumnName,
watermarkSpec.asSummaryString()));
+ }
+ });
+
+ Schema.Builder builder = Schema.newBuilder();
+ // build column
+ originSchema
+ .getColumns()
+ .forEach(
+ column -> {
+ if (originColumnName.equals(column.getName())) {
+ buildNewColumnFromOriginColumn(builder,
column, newColumnName);
+ } else {
+ buildNewColumnFromOriginColumn(builder,
column, column.getName());
+ }
+ });
+ // build primary key
+ Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey =
originSchema.getPrimaryKey();
+ if (originPrimaryKey.isPresent()) {
+ List<String> originPrimaryKeyNames =
originPrimaryKey.get().getColumnNames();
+ String constrainName = originPrimaryKey.get().getConstraintName();
+ List<String> newPrimaryKeyNames =
+ originPrimaryKeyNames.stream()
+ .map(pkName -> pkName.equals(originColumnName) ?
newColumnName : pkName)
+ .collect(Collectors.toList());
+ builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+ }
+
+ // build watermark
+ originSchema
+ .getWatermarkSpecs()
+ .forEach(
+ watermarkSpec ->
+ builder.watermark(
+ watermarkSpec.getColumnName(),
+
watermarkSpec.getWatermarkExpression()));
+
+ // build partition key
+ List<String> newPartitionKeys =
+ catalogTable.getPartitionKeys().stream()
+ .map(name -> name.equals(originColumnName) ?
newColumnName : name)
+ .collect(Collectors.toList());
+
+ // generate new schema
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ CatalogTable.of(
+ builder.build(),
+ catalogTable.getComment(),
+ newPartitionKeys,
+ catalogTable.getOptions()));
+ }
+
+ private static void validateColumnName(
+ String originColumnName,
+ String newColumnName,
+ List<String> tableColumns,
+ ResolvedSchema originResolvedSchema) {
+ // validate old column
+ if (!tableColumns.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s not found in table schema for
RENAME COLUMN",
+ originColumnName));
+ }
+
+ // validate new column
+ if (tableColumns.contains(newColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "New column %s already existed in table schema for
RENAME COLUMN",
+ newColumnName));
+ }
+
+ // validate old column name is referenced by computed column case
+ originResolvedSchema.getColumns().stream()
+ .filter(column -> column instanceof Column.ComputedColumn)
+ .forEach(
+ column -> {
+ Column.ComputedColumn computedColumn =
(Column.ComputedColumn) column;
+ Set<String> referencedColumn =
+ ColumnReferenceFinder.findReferencedColumn(
+ computedColumn.getExpression(),
tableColumns);
+ if (referencedColumn.contains(originColumnName)) {
+ throw new ValidationException(
+ String.format(
+ "Old column %s is referenced
by computed column %s, currently doesn't "
+ + "allow to rename
column which is referenced by computed column.",
+ originColumnName,
+
computedColumn.asSummaryString()));
+ }
+ });
+ }
+
+ private static void buildNewColumnFromOriginColumn(
+ Schema.Builder builder, Schema.UnresolvedColumn originColumn,
String columnName) {
+ if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+ builder.columnByExpression(
+ columnName, ((Schema.UnresolvedComputedColumn)
originColumn).getExpression());
+ } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+ builder.column(
+ columnName, ((Schema.UnresolvedPhysicalColumn)
originColumn).getDataType());
+ } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+ Schema.UnresolvedMetadataColumn metadataColumn =
+ (Schema.UnresolvedMetadataColumn) originColumn;
+ builder.columnByMetadata(
+ columnName,
+ metadataColumn.getDataType(),
+ metadataColumn.getMetadataKey(),
+ metadataColumn.isVirtual());
+ }
Review Comment:
Should we adding an else statement code block to throw exception in here so
that the caller can know it and do the adaption.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1269,6 +1271,112 @@ public void testAlterTable() throws Exception {
.hasMessageContaining("ALTER TABLE RESET does not support
empty key");
}
+ @Test
+ public void testAlterTableRenameColumn() throws Exception {
+ Catalog catalog = new GenericInMemoryCatalog("default", "default");
+ catalogManager.registerCatalog("cat1", catalog);
+ functionCatalog.registerTempCatalogScalarFunction(
+ ObjectIdentifier.of("cat1", "default", "my_udf1"),
Func0$.MODULE$);
+
+ catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
null), true);
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ Schema.newBuilder()
+ .column("a", DataTypes.STRING().notNull())
+ .column("b", DataTypes.INT().notNull())
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.INT())
+ .column("e", DataTypes.STRING())
+ .column("f", DataTypes.TIMESTAMP(3).notNull())
+ .columnByExpression("h", "c - 1")
+ .columnByExpression("i",
"cat1.`default`.my_udf1(d) + 1")
+ .columnByMetadata("m", DataTypes.INT(), true)
+ .columnByExpression("g", "TO_TIMESTAMP(e)")
+ .watermark("f", "g - INTERVAL '5' SECOND")
+ .primaryKey("a", "b")
+ .build(),
+ "tb1",
+ Collections.emptyList(),
+ Collections.emptyMap());
+ catalogManager.setCurrentCatalog("cat1");
+ catalogManager.setCurrentDatabase("db1");
+ catalog.createTable(new ObjectPath("db1", "tb1"), catalogTable, true);
+ // Test alter table rename
+ // rename column b
+ Operation operation = parse("alter table tb1 rename b to b1",
SqlDialect.DEFAULT);
+ assert operation instanceof AlterTableSchemaOperation;
+ Schema actual =
+ ((AlterTableSchemaOperation)
operation).getCatalogTable().getUnresolvedSchema();
+ Schema expected =
+ Schema.newBuilder()
+ .column("a", DataTypes.STRING().notNull())
+ .column("b1", DataTypes.INT().notNull())
+ .column("c", DataTypes.INT())
+ .column("d", DataTypes.INT())
+ .column("e", DataTypes.STRING())
+ .column("f", DataTypes.TIMESTAMP(3).notNull())
+ .columnByExpression("h", "c - 1")
+ .columnByExpression("i", "cat1.`default`.my_udf1(d) +
1")
+ .columnByMetadata("m", DataTypes.INT(), true)
+ .columnByExpression("g", "TO_TIMESTAMP(e)")
+ .watermark("f", "g - INTERVAL '5' SECOND")
+ .primaryKeyNamed("PK_a_b", "a", "b1")
+ .build();
+ assertThat(expected).isEqualTo(actual);
+
+ // rename column c test computed column case1
+ assertThatThrownBy(() -> parse("alter table tb1 rename c to c1",
SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Old column c is referenced by computed column `h` INT
AS c - 1, "
+ + "currently doesn't allow to rename column
which is referenced by computed column.");
+
+ // rename column d test computed column case2
+ assertThatThrownBy(() -> parse("alter table tb1 rename d to d1",
SqlDialect.DEFAULT))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Old column d is referenced by computed column `i` INT
NOT NULL AS cat1.`default`.my_udf1(d) + 1,"
+ + " currently doesn't allow to rename column
which is referenced by computed column.");
+
+ // rename column e test computed column case3
+ CatalogTable catalogTable2 =
Review Comment:
Can we test the computed column case3 with `tb1` so that the code can be
more readable.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,150 @@ public static Operation convertChangeColumn(
// TODO: handle watermark and constraints
}
+ public static Operation convertRenameColumn(
+ ObjectIdentifier tableIdentifier,
+ String originColumnName,
+ String newColumnName,
+ CatalogTable catalogTable,
+ ResolvedSchema originResolveSchema) {
+ Schema originSchema = catalogTable.getUnresolvedSchema();
+ List<String> tableColumns =
+ originSchema.getColumns().stream()
+ .map(Schema.UnresolvedColumn::getName)
+ .collect(Collectors.toList());
+ // validate old column is exists or new column is duplicated or old
column is
Review Comment:
Nit:
```suggestion
// validate old column is exists or new column isn't duplicated or
old column is
```
And the following comments with `validate` should also change in same way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]