LadyForest commented on code in PR #20652: URL: https://github.com/apache/flink/pull/20652#discussion_r1005830753
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java: ########## @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd; +import org.apache.flink.sql.parser.ddl.SqlAlterTableModify; +import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; +import org.apache.flink.sql.parser.ddl.SqlWatermark; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlValidator; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */ +public class AlterTableSchemaUtil { + + private final SqlValidator sqlValidator; + private final Consumer<SqlTableConstraint> validateTableConstraint; + private final Function<SqlNode, String> escapeExpression; + + AlterTableSchemaUtil( + SqlValidator sqlValidator, + Function<SqlNode, String> escapeExpression, + Consumer<SqlTableConstraint> validateTableConstraint) { + this.sqlValidator = sqlValidator; + this.validateTableConstraint = validateTableConstraint; + this.escapeExpression = escapeExpression; + } + + public Schema convertSchema( + SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) { + UnresolvedSchemaBuilder builder = + new UnresolvedSchemaBuilder( + originalTable, + (FlinkTypeFactory) sqlValidator.getTypeFactory(), + sqlValidator, + validateTableConstraint, + escapeExpression); + AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema); + List<SqlNode> columnPositions = alterTableSchema.getColumnPositions().getList(); + builder.addOrModifyColumns(strategy, columnPositions); + alterTableSchema + .getWatermark() + .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark)); + alterTableSchema + .getFullConstraint() + .ifPresent( + (pk) -> + builder.addOrModifyPrimaryKey( + strategy, pk, columnPositions.isEmpty())); + return builder.build(); + } + + private static class UnresolvedSchemaBuilder { + + List<String> sortedColumnNames = new ArrayList<>(); + Map<String, Schema.UnresolvedColumn> columns = new HashMap<>(); + Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpecs = new HashMap<>(); + @Nullable Schema.UnresolvedPrimaryKey primaryKey = null; + + // Intermediate state + Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>(); + Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>(); + Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>(); + + Function<SqlNode, String> escapeExpressions; + FlinkTypeFactory typeFactory; + SqlValidator sqlValidator; + Consumer<SqlTableConstraint> validateTableConstraint; + + UnresolvedSchemaBuilder( + ResolvedCatalogTable sourceTable, + FlinkTypeFactory typeFactory, + SqlValidator sqlValidator, + Consumer<SqlTableConstraint> validateTableConstraint, + Function<SqlNode, String> escapeExpressions) { + this.typeFactory = typeFactory; + this.sqlValidator = sqlValidator; + this.escapeExpressions = escapeExpressions; + this.validateTableConstraint = validateTableConstraint; + populateColumnsFromSourceTable(sourceTable); + populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema()); + populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema()); + } + + private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) { + List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes(); + List<Schema.UnresolvedColumn> sourceColumns = + sourceTable.getUnresolvedSchema().getColumns(); + for (int i = 0; i < sourceColumns.size(); i++) { + Schema.UnresolvedColumn column = sourceColumns.get(i); + String columnName = column.getName(); + sortedColumnNames.add(columnName); + columns.put(columnName, column); + RelDataType type = + typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType()); + if (column instanceof Schema.UnresolvedPhysicalColumn) { + physicalFieldNamesToTypes.put(columnName, type); + } else if (column instanceof Schema.UnresolvedComputedColumn) { + computedFieldNamesToTypes.put(columnName, type); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + metadataFieldNamesToTypes.put(columnName, type); + } + } + } + + private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) { + if (sourceSchema.getPrimaryKey().isPresent()) { + primaryKey = sourceSchema.getPrimaryKey().get(); + } + } + + private void populateWatermarksFromSourceTable(Schema sourceSchema) { + for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec : + sourceSchema.getWatermarkSpecs()) { + watermarkSpecs.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec); + } + } + + private void addOrModifyColumns( + AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) { + collectColumnPosition(strategy, alterColumnPositions); + for (SqlNode alterColumnPos : alterColumnPositions) { + SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn(); + if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) { + convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn); + } else { + convertNonComputedColumn(alterColumn); + } + } + } + + private void addOrModifyPrimaryKey( + AlterSchemaStrategy strategy, + SqlTableConstraint alterPrimaryKey, + boolean withoutAlterColumn) { + validateTableConstraint.accept(alterPrimaryKey); + if (strategy == AlterSchemaStrategy.ADD && primaryKey != null) { + throw new ValidationException( + String.format( + "The base table already has a primary key %s. You might " + + "want to drop it before adding a new one.", + primaryKey.getColumnNames())); + } else if (strategy == AlterSchemaStrategy.MODIFY && primaryKey == null) { + throw new ValidationException( + "The base table does not define any primary key. You might " + + "want to add a new one."); + } + List<String> primaryKeyColumns = new ArrayList<>(); + for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) { + String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple(); + if (!columns.containsKey(primaryKey)) { + throw new ValidationException( + String.format( + "Primary key column '%s' is not defined in the schema at %s", + primaryKey, primaryKeyNode.getParserPosition())); + } + if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) { + throw new ValidationException( + String.format( + "Could not create a PRIMARY KEY with column '%s' at %s.\n" + + "A PRIMARY KEY constraint must be declared on physical columns.", + primaryKey, primaryKeyNode.getParserPosition())); + } + primaryKeyColumns.add(primaryKey); + } + if (withoutAlterColumn) { + // a single add/modify constraint changes the nullability of columns implicitly + for (String primaryKeyColumn : primaryKeyColumns) { + RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn); + DataType alterDataType = + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType( + typeFactory.createTypeWithNullability( + originalType, false))); + Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn); + columns.put( + primaryKeyColumn, + column( + primaryKeyColumn, + alterDataType, + column.getComment().orElse(null))); + } + } + primaryKey = + primaryKeyNamed( + alterPrimaryKey + .getConstraintName() + .orElseGet( + () -> + primaryKeyColumns.stream() + .collect( + Collectors.joining( + "_", "PK_", ""))), + primaryKeyColumns); + } + + private void addOrModifyWatermarks( + AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) { + SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName(); + + Map<String, RelDataType> nameToTypeMap = new HashMap<>(); + nameToTypeMap.putAll(physicalFieldNamesToTypes); + nameToTypeMap.putAll(metadataFieldNamesToTypes); + nameToTypeMap.putAll(computedFieldNamesToTypes); + + verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap); + String rowtimeAttribute = eventTimeColumnName.toString(); + + SqlNode expression = alterWatermarkSpec.getWatermarkStrategy(); + + // this will validate and expand function identifiers + SqlNode validated = + sqlValidator.validateParameterizedExpression(expression, nameToTypeMap); + watermarkSpecs.clear(); + watermarkSpecs.put( + rowtimeAttribute, + watermarkSpec( + rowtimeAttribute, + new SqlCallExpression(escapeExpressions.apply(validated)))); + } + + private void convertNonComputedColumn(SqlTableColumn column) { + boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn; + String name = column.getName().getSimple(); + SqlDataTypeSpec typeSpec = + isPhysical + ? ((SqlTableColumn.SqlRegularColumn) column).getType() + : ((SqlTableColumn.SqlMetadataColumn) column).getType(); + boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable(); + LogicalType logicalType = + FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable)); + DataType dataType = TypeConversions.fromLogicalToDataType(logicalType); + RelDataType relType = typeSpec.deriveType(sqlValidator, nullable); + Schema.UnresolvedColumn newColumn; + if (isPhysical) { + newColumn = column(name, dataType, getComment(column)); + physicalFieldNamesToTypes.put(name, relType); + } else { + newColumn = + columnByMetadata( + name, + dataType, + ((SqlTableColumn.SqlMetadataColumn) column) + .getMetadataAlias() + .orElse(null), + ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(), + getComment(column)); + metadataFieldNamesToTypes.put(name, relType); + } + columns.put(name, newColumn); + } + + private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) { + String name = column.getName().getSimple(); + Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>(); + accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes); + accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes); + + // computed column cannot be generated on another computed column + final SqlNode validatedExpr = + sqlValidator.validateParameterizedExpression( + column.getExpr(), accessibleFieldNamesToTypes); + final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr); + + Schema.UnresolvedColumn newColumn = + columnByExpression( + name, escapeExpressions.apply(validatedExpr), getComment(column)); + computedFieldNamesToTypes.put(name, validatedType); + columns.put(name, newColumn); + } + + private void verifyRowtimeAttribute( + AlterSchemaStrategy strategy, + SqlIdentifier eventTimeColumnName, + Map<String, RelDataType> allFieldsTypes) { + if (!watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.ADD) { + throw new ValidationException( + String.format( + "There already exists a watermark spec for column '%s' in the base table. You " + + "might want to drop it before adding a new one.", + watermarkSpecs.keySet())); + } else if (watermarkSpecs.isEmpty() && strategy == AlterSchemaStrategy.MODIFY) { + throw new ValidationException( + "There is no watermark defined in the base table. You might want to add a new one."); + } + if (!eventTimeColumnName.isSimple()) { + throw new ValidationException( + String.format( + "The nested rowtime attribute field '%s' cannot define a watermark.", + eventTimeColumnName)); + } + String rowtimeField = eventTimeColumnName.getSimple(); + List<String> components = eventTimeColumnName.names; + if (!allFieldsTypes.containsKey(components.get(0))) { + throw new ValidationException( + String.format( + "The rowtime attribute field '%s' is not defined in the table schema, at %s\n" + + "Available fields: [%s]", + rowtimeField, + eventTimeColumnName.getParserPosition(), + allFieldsTypes.keySet().stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + } + + @Nullable + private String getComment(SqlTableColumn column) { + return column.getComment() + .map(SqlCharStringLiteral.class::cast) + .map(c -> c.getValueAs(String.class)) + .orElse(null); + } + + private void collectColumnPosition( + AlterSchemaStrategy strategy, List<SqlNode> alterColumns) { + for (SqlNode alterColumn : alterColumns) { + assert alterColumn instanceof SqlTableColumnPosition; + SqlTableColumnPosition columnPosition = (SqlTableColumnPosition) alterColumn; + SqlTableColumn column = columnPosition.getColumn(); + String name = column.getName().getSimple(); + boolean existed = sortedColumnNames.contains(name); + boolean first = columnPosition.isFirstColumn(); + boolean after = columnPosition.isAfterReferencedColumn(); + if (strategy == AlterSchemaStrategy.ADD) { + if (existed) { + throw new ValidationException( + String.format( + "Try to add a column '%s' which already exists in the table.", + name)); + } + if (first) { + sortedColumnNames.add(0, name); + } else if (after) { + getReferencedColumn( + columnPosition, + (refName) -> sortedColumnNames.contains(refName)) + .ifPresent( + (refCol) -> + sortedColumnNames.add( + sortedColumnNames.indexOf(refCol) + 1, + name)); + } else { + sortedColumnNames.add(name); + } + } else if (strategy == AlterSchemaStrategy.MODIFY) { + if (!existed) { + throw new ValidationException( + String.format( + "Try to modify a column '%s' which does not exist in the table.", + name)); + } + if (first) { + sortedColumnNames.remove(name); + sortedColumnNames.add(0, name); + } else if (after) { + getReferencedColumn( + columnPosition, (refName) -> columns.containsKey(refName)) + .ifPresent( + (refCol) -> { + sortedColumnNames.remove(name); + sortedColumnNames.add( + sortedColumnNames.indexOf(refCol) + 1, name); + }); + } + } + } + } + + private Optional<String> getReferencedColumn( + SqlTableColumnPosition columnPosition, Function<String, Boolean> existFn) { + assert columnPosition.getAfterReferencedColumn() != null; + String referencedName = columnPosition.getAfterReferencedColumn().getSimple(); + if (!existFn.apply(referencedName)) { + throw new ValidationException( + String.format( + "Referenced column '%s' by 'AFTER' does not exist in the table.", + referencedName)); + } + return Optional.of(referencedName); + } + + private Schema.UnresolvedColumn column( + String columnName, AbstractDataType<?> dataType, @Nullable String comment) { + return Schema.newBuilder() + .column(columnName, dataType) + .withComment(comment) + .build() + .getColumns() + .get(0); Review Comment: > Replace with constructor? I think there is no public constructor for `UnresolvedPhysicalColumn`, and we had a discussion about this https://github.com/apache/flink/pull/20652#discussion_r976475200. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
