LadyForest commented on code in PR #20652: URL: https://github.com/apache/flink/pull/20652#discussion_r990147098
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterTableSchemaUtil.java: ########## @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.sql.parser.ddl.SqlAlterTableAdd; +import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema; +import org.apache.flink.sql.parser.ddl.SqlTableColumn; +import org.apache.flink.sql.parser.ddl.SqlWatermark; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.util.NlsString; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Util to convert {@link SqlAlterTableSchema} with source table to generate new {@link Schema}. */ +public class AlterTableSchemaUtil { + + private final SqlValidator sqlValidator; + private final Consumer<SqlTableConstraint> validateTableConstraint; + private final Function<SqlNode, String> escapeExpression; + + AlterTableSchemaUtil( + SqlValidator sqlValidator, + Function<SqlNode, String> escapeExpression, + Consumer<SqlTableConstraint> validateTableConstraint) { + this.sqlValidator = sqlValidator; + this.validateTableConstraint = validateTableConstraint; + this.escapeExpression = escapeExpression; + } + + public Schema convertSchema( + SqlAlterTableSchema alterTableSchema, ResolvedCatalogTable originalTable) { + UnresolvedSchemaBuilder builder = + new UnresolvedSchemaBuilder( + originalTable, + (FlinkTypeFactory) sqlValidator.getTypeFactory(), + sqlValidator, + validateTableConstraint, + escapeExpression); + AlterSchemaStrategy strategy = + alterTableSchema instanceof SqlAlterTableAdd + ? AlterSchemaStrategy.ADD + : AlterSchemaStrategy.MODIFY; + builder.addOrModifyColumns(strategy, alterTableSchema.getColumns().getList()); + List<SqlTableConstraint> fullConstraint = alterTableSchema.getFullConstraint(); + if (!fullConstraint.isEmpty()) { + builder.addOrModifyPrimaryKey(strategy, fullConstraint.get(0)); + } + alterTableSchema + .getWatermark() + .ifPresent(sqlWatermark -> builder.addOrModifyWatermarks(strategy, sqlWatermark)); + return builder.build(); + } + + private static class UnresolvedSchemaBuilder { + + List<String> newColumnNames = new ArrayList<>(); + Set<String> alterColumnNames = new HashSet<>(); + Map<String, Schema.UnresolvedColumn> columns = new HashMap<>(); + Map<String, Schema.UnresolvedWatermarkSpec> watermarkSpec = new HashMap<>(); + @Nullable Schema.UnresolvedPrimaryKey primaryKey = null; + + // Intermediate state + Map<String, RelDataType> physicalFieldNamesToTypes = new HashMap<>(); + Map<String, RelDataType> metadataFieldNamesToTypes = new HashMap<>(); + Map<String, RelDataType> computedFieldNamesToTypes = new HashMap<>(); + + Function<SqlNode, String> escapeExpressions; + FlinkTypeFactory typeFactory; + SqlValidator sqlValidator; + Consumer<SqlTableConstraint> validateTableConstraint; + + UnresolvedSchemaBuilder( + ResolvedCatalogTable sourceTable, + FlinkTypeFactory typeFactory, + SqlValidator sqlValidator, + Consumer<SqlTableConstraint> validateTableConstraint, + Function<SqlNode, String> escapeExpressions) { + this.typeFactory = typeFactory; + this.sqlValidator = sqlValidator; + this.escapeExpressions = escapeExpressions; + this.validateTableConstraint = validateTableConstraint; + populateColumnsFromSourceTable(sourceTable); + populatePrimaryKeyFromSourceTable(sourceTable.getUnresolvedSchema()); + populateWatermarksFromSourceTable(sourceTable.getUnresolvedSchema()); + } + + private void populateColumnsFromSourceTable(ResolvedCatalogTable sourceTable) { + List<DataType> types = sourceTable.getResolvedSchema().getColumnDataTypes(); + List<Schema.UnresolvedColumn> sourceColumns = + sourceTable.getUnresolvedSchema().getColumns(); + for (int i = 0; i < sourceColumns.size(); i++) { + Schema.UnresolvedColumn column = sourceColumns.get(i); + String columnName = column.getName(); + newColumnNames.add(columnName); + columns.put(columnName, column); + RelDataType type = + typeFactory.createFieldTypeFromLogicalType(types.get(i).getLogicalType()); + if (column instanceof Schema.UnresolvedPhysicalColumn) { + physicalFieldNamesToTypes.put(columnName, type); + } else if (column instanceof Schema.UnresolvedComputedColumn) { + computedFieldNamesToTypes.put(columnName, type); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + metadataFieldNamesToTypes.put(columnName, type); + } + } + } + + private void populatePrimaryKeyFromSourceTable(Schema sourceSchema) { + if (sourceSchema.getPrimaryKey().isPresent()) { + primaryKey = sourceSchema.getPrimaryKey().get(); + } + } + + private void populateWatermarksFromSourceTable(Schema sourceSchema) { + for (Schema.UnresolvedWatermarkSpec sourceWatermarkSpec : + sourceSchema.getWatermarkSpecs()) { + watermarkSpec.put(sourceWatermarkSpec.getColumnName(), sourceWatermarkSpec); + } + } + + private void addOrModifyColumns( + AlterSchemaStrategy strategy, List<SqlNode> alterColumnPositions) { + collectColumnPosition(strategy, alterColumnPositions); + for (SqlNode alterColumnPos : alterColumnPositions) { + SqlTableColumn alterColumn = ((SqlTableColumnPosition) alterColumnPos).getColumn(); + if (alterColumn instanceof SqlTableColumn.SqlComputedColumn) { + convertComputedColumn((SqlTableColumn.SqlComputedColumn) alterColumn); + } else { + convertNonComputedColumn(alterColumn); + } + } + } + + private void addOrModifyPrimaryKey( + AlterSchemaStrategy strategy, SqlTableConstraint alterPrimaryKey) { + validateTableConstraint.accept(alterPrimaryKey); + if (strategy == AlterSchemaStrategy.ADD) { + if (primaryKey != null) { + throw new ValidationException( + String.format( + "The base table already has a primary key %s. You might " + + "want to drop it before adding a new one.", + primaryKey.getColumnNames())); + } + } + List<String> primaryKeyColumns = new ArrayList<>(); + for (SqlNode primaryKeyNode : alterPrimaryKey.getColumns()) { + String primaryKey = ((SqlIdentifier) primaryKeyNode).getSimple(); + if (!columns.containsKey(primaryKey)) { + throw new ValidationException( + String.format( + "Primary key column '%s' is not defined in the schema at %s", + primaryKey, primaryKeyNode.getParserPosition())); + } + if (!(columns.get(primaryKey) instanceof Schema.UnresolvedPhysicalColumn)) { + throw new ValidationException( + String.format( + "Could not create a PRIMARY KEY with column '%s' at %s.\n" + + "A PRIMARY KEY constraint must be declared on physical columns.", + primaryKey, primaryKeyNode.getParserPosition())); + } + primaryKeyColumns.add(primaryKey); + } + if (alterColumnNames.isEmpty()) { + // a single add/modify constraint changes the nullability of columns implicitly + for (String primaryKeyColumn : primaryKeyColumns) { + RelDataType originalType = physicalFieldNamesToTypes.get(primaryKeyColumn); + DataType alterDataType = + TypeConversions.fromLogicalToDataType( + FlinkTypeFactory.toLogicalType( + typeFactory.createTypeWithNullability( + originalType, false))); + Schema.UnresolvedColumn column = columns.remove(primaryKeyColumn); + columns.put( + primaryKeyColumn, + new Schema.UnresolvedPhysicalColumn( + primaryKeyColumn, + alterDataType, + column.getComment().orElse(null))); + } + } + primaryKey = + new Schema.UnresolvedPrimaryKey( + alterPrimaryKey + .getConstraintName() + .orElseGet( + () -> + primaryKeyColumns.stream() + .collect( + Collectors.joining( + "_", "PK_", ""))), + primaryKeyColumns); + } + + private void addOrModifyWatermarks( + AlterSchemaStrategy strategy, SqlWatermark alterWatermarkSpec) { + SqlIdentifier eventTimeColumnName = alterWatermarkSpec.getEventTimeColumnName(); + + Map<String, RelDataType> nameToTypeMap = new HashMap<>(); + nameToTypeMap.putAll(physicalFieldNamesToTypes); + nameToTypeMap.putAll(metadataFieldNamesToTypes); + nameToTypeMap.putAll(computedFieldNamesToTypes); + + verifyRowtimeAttribute(strategy, eventTimeColumnName, nameToTypeMap); + String rowtimeAttribute = eventTimeColumnName.toString(); + + SqlNode expression = alterWatermarkSpec.getWatermarkStrategy(); + + // this will validate and expand function identifiers + SqlNode validated = + sqlValidator.validateParameterizedExpression(expression, nameToTypeMap); + watermarkSpec.clear(); + watermarkSpec.put( + rowtimeAttribute, + new Schema.UnresolvedWatermarkSpec( + rowtimeAttribute, + new SqlCallExpression(escapeExpressions.apply(validated)))); + } + + private void convertNonComputedColumn(SqlTableColumn column) { + boolean isPhysical = column instanceof SqlTableColumn.SqlRegularColumn; + String name = column.getName().getSimple(); + SqlDataTypeSpec typeSpec = + isPhysical + ? ((SqlTableColumn.SqlRegularColumn) column).getType() + : ((SqlTableColumn.SqlMetadataColumn) column).getType(); + boolean nullable = typeSpec.getNullable() == null || typeSpec.getNullable(); + LogicalType logicalType = + FlinkTypeFactory.toLogicalType(typeSpec.deriveType(sqlValidator, nullable)); + DataType dataType = TypeConversions.fromLogicalToDataType(logicalType); + RelDataType relType = typeSpec.deriveType(sqlValidator, nullable); + Schema.UnresolvedColumn newColumn; + if (isPhysical) { + newColumn = new Schema.UnresolvedPhysicalColumn(name, dataType, getComment(column)); + physicalFieldNamesToTypes.put(name, relType); + } else { + newColumn = + new Schema.UnresolvedMetadataColumn( + name, + dataType, + ((SqlTableColumn.SqlMetadataColumn) column) + .getMetadataAlias() + .orElse(null), + ((SqlTableColumn.SqlMetadataColumn) column).isVirtual(), + getComment(column)); + metadataFieldNamesToTypes.put(name, relType); + } + columns.put(name, newColumn); + } + + private void convertComputedColumn(SqlTableColumn.SqlComputedColumn column) { + String name = column.getName().getSimple(); + Map<String, RelDataType> accessibleFieldNamesToTypes = new HashMap<>(); + accessibleFieldNamesToTypes.putAll(physicalFieldNamesToTypes); + accessibleFieldNamesToTypes.putAll(metadataFieldNamesToTypes); + + // computed column cannot be generated on another computed column + final SqlNode validatedExpr = + sqlValidator.validateParameterizedExpression( + column.getExpr(), accessibleFieldNamesToTypes); + final RelDataType validatedType = sqlValidator.getValidatedNodeType(validatedExpr); + + Schema.UnresolvedColumn newColumn = + new Schema.UnresolvedComputedColumn( + name, + new SqlCallExpression(escapeExpressions.apply(validatedExpr)), + getComment(column)); + computedFieldNamesToTypes.put(name, validatedType); + columns.put(name, newColumn); + } + + private void verifyRowtimeAttribute( + AlterSchemaStrategy strategy, + SqlIdentifier eventTimeColumnName, + Map<String, RelDataType> allFieldsTypes) { + if (!watermarkSpec.isEmpty() && strategy == AlterSchemaStrategy.ADD) { + throw new ValidationException( + String.format( + "There already exists a watermark spec for column '%s' in the base table. You " + + "might want to drop it before adding a new one.", + watermarkSpec.keySet())); + } + if (!eventTimeColumnName.isSimple()) { Review Comment: > We can support defining watermark on nested field, I think we should align the logic in > > https://github.com/apache/flink/blob/34de3989a613cf7124f9e301cb8284080f4df4ac/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java#L373 I don't think we truly support defining a watermark on the nested column. E.g. ```sql CREATE TABLE foo( f0 INT, f1 STRING, f2 ROW<f21 STRING, f22 TIMESTAMP(3)>, WATERMARK FOR f2.f22 AS f2.f22 ) ``` will throw `ValidationException` because the schema resolver does not take type into account when validating the rowtime column. https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java#L250-L259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
