fsk119 commented on code in PR #21571:
URL: https://github.com/apache/flink/pull/21571#discussion_r1059222276
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema
alterTableSchema, Schema ori
}
public Schema applySchemaChange(
- SqlAlterTableRenameColumn renameColumn, ContextResolvedTable
originalTable) {
- String oldColumnName =
getColumnName(renameColumn.getOriginColumnIdentifier());
+ SqlAlterTableRenameColumn renameColumn, ContextResolvedTable
originTable) {
+ String originColumnName =
getColumnName(renameColumn.getOriginColumnIdentifier());
String newColumnName =
getColumnName(renameColumn.getNewColumnIdentifier());
List<String> tableColumns =
- originalTable.getResolvedSchema().getColumns().stream()
+ originTable.getResolvedSchema().getColumns().stream()
.map(Column::getName)
.collect(Collectors.toList());
- // validate old column is exists or new column isn't duplicated or old
column isn't
- // referenced by computed column
+ // validate origin column is exists, new column name does not collide
with existed column
+ // names, and origin column isn't referenced by computed column
validateColumnName(
- oldColumnName,
+ originColumnName,
newColumnName,
tableColumns,
- originalTable.getResolvedSchema(),
- ((CatalogTable)
originalTable.getResolvedTable()).getPartitionKeys());
+ originTable.getResolvedSchema(),
+ ((CatalogTable)
originTable.getResolvedTable()).getPartitionKeys());
+ validateWatermark(originTable, originColumnName, tableColumns);
- // validate old column isn't referenced by watermark
- List<WatermarkSpec> watermarkSpecs =
originalTable.getResolvedSchema().getWatermarkSpecs();
- watermarkSpecs.forEach(
- watermarkSpec -> {
- String rowtimeAttribute =
watermarkSpec.getRowtimeAttribute();
- Set<String> referencedColumns =
- ColumnReferenceFinder.findReferencedColumn(
- watermarkSpec.getWatermarkExpression(),
tableColumns);
- if (oldColumnName.equals(rowtimeAttribute)
- || referencedColumns.contains(oldColumnName)) {
- throw new ValidationException(
- String.format(
- "Old column %s is referred by
watermark expression %s, "
- + "currently doesn't allow to
rename column which is "
- + "referred by watermark
expression.",
- oldColumnName,
watermarkSpec.asSummaryString()));
+ // generate new schema
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> {
+ if (column.getName().equals(originColumnName)) {
+ buildNewColumnFromOriginColumn(builder, column,
newColumnName);
+ } else {
+ builder.fromColumns(Collections.singletonList(column));
}
});
+ buildUpdatedPrimaryKey(
+ schemaBuilder,
+ originTable,
+ (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
- Schema.Builder builder = Schema.newBuilder();
- // build column
- Schema originSchema = originalTable.getTable().getUnresolvedSchema();
- originSchema
- .getColumns()
+ public Schema applySchemaChange(
+ SqlAlterTableDropColumn dropColumn, ContextResolvedTable
originTable) {
+ List<String> tableColumns =
+ originTable.getResolvedSchema().getColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toList());
+ Set<String> primaryKeys = new HashSet<>();
+ originTable
+ .getResolvedSchema()
+ .getPrimaryKey()
+ .ifPresent(pk -> primaryKeys.addAll(pk.getColumns()));
+ Set<String> columnsToDrop = new HashSet<>();
+ dropColumn
+ .getColumnList()
.forEach(
- column -> {
- if (oldColumnName.equals(column.getName())) {
- buildNewColumnFromOriginColumn(builder,
column, newColumnName);
- } else {
-
builder.fromColumns(Collections.singletonList(column));
+ identifier -> {
+ String name = getColumnName((SqlIdentifier)
identifier);
+ if (!columnsToDrop.add(name)) {
+ throw new ValidationException(
+ String.format(
+ "%sDuplicate column `%s`.",
EX_MSG_PREFIX, name));
}
});
- // build primary key
- Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey =
originSchema.getPrimaryKey();
- if (originPrimaryKey.isPresent()) {
- List<String> originPrimaryKeyNames =
originPrimaryKey.get().getColumnNames();
- String constrainName = originPrimaryKey.get().getConstraintName();
- List<String> newPrimaryKeyNames =
- originPrimaryKeyNames.stream()
- .map(pkName -> pkName.equals(oldColumnName) ?
newColumnName : pkName)
- .collect(Collectors.toList());
- builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
- }
-
- // build watermark
- originSchema
- .getWatermarkSpecs()
- .forEach(
- watermarkSpec ->
- builder.watermark(
- watermarkSpec.getColumnName(),
-
watermarkSpec.getWatermarkExpression()));
- // generate new schema
- return builder.build();
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+ String columnToDrop = getColumnName((SqlIdentifier)
columnIdentifier);
+ // validate the column to drop exists in the table schema, is not
a primary key and
+ // does not derive any computed column
+ validateColumnName(
+ columnToDrop,
+ tableColumns,
+ originTable.getResolvedSchema(),
+ ((CatalogTable)
originTable.getResolvedTable()).getPartitionKeys(),
+ primaryKeys,
+ columnsToDrop);
+ validateWatermark(originTable, columnToDrop, tableColumns);
+ }
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) -> {
+ if (!columnsToDrop.contains(column.getName())) {
+ builder.fromColumns(Collections.singletonList(column));
+ }
+ });
+ buildUpdatedPrimaryKey(schemaBuilder, originTable, (pk) -> pk);
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
+
+ public Schema applySchemaChange(
+ SqlAlterTableDropConstraint dropConstraint, ContextResolvedTable
originTable) {
+ Optional<UniqueConstraint> pkConstraint =
originTable.getResolvedSchema().getPrimaryKey();
+ if (!pkConstraint.isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define any primary
key.", EX_MSG_PREFIX));
+ }
+ SqlIdentifier constraintIdentifier =
dropConstraint.getConstraintName();
+ String constraintName = pkConstraint.get().getName();
+ if (constraintIdentifier != null
+ && !constraintIdentifier.getSimple().equals(constraintName)) {
+ throw new ValidationException(
+ String.format(
+ "%sThe base table does not define a primary key
constraint named '%s'. "
+ + "Available constraint name: ['%s'].",
+ EX_MSG_PREFIX, constraintIdentifier.getSimple(),
constraintName));
+ }
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ buildUpdatedColumn(
+ schemaBuilder,
+ originTable,
+ (builder, column) ->
builder.fromColumns(Collections.singletonList(column)));
+ buildUpdatedWatermark(schemaBuilder, originTable);
+ return schemaBuilder.build();
+ }
+
+ public Schema applySchemaChange(ContextResolvedTable originTable) {
Review Comment:
The java doc is not as straightforward as the signature. It's fine to leave
something unused. In the `SqlToOperationConverter#convertBeginStatementSet`, we
also use the `SqlBeginStatementSet` as the input parameter that is not used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]