lsyldliu commented on code in PR #25880: URL: https://github.com/apache/flink/pull/25880#discussion_r1906542783
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL + // does not contain LATERAL which is problematic, + // the issue was resolved in CALCITE-4077 + // (always treat the table function as implicitly LATERAL). + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validatedQuery).project(), () -> originalQuery); + + ResolvedCatalogMaterializedTable oldTable = + getResolvedMaterializedTable(context, identifier); + List<Column> addedColumns = + validateAndExtractNewColumns( + oldTable.getResolvedSchema(), queryOperation.getResolvedSchema()); + + // Build new materialized table and apply changes + CatalogMaterializedTable updatedTable = + buildUpdatedMaterializedTable(oldTable, addedColumns, definitionQuery); + List<MaterializedTableChange> tableChanges = new ArrayList<>(); + addedColumns.forEach(column -> tableChanges.add(TableChange.add(column))); + tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery)); + + return new AlterMaterializedTableAsQueryOperation(identifier, tableChanges, updatedTable); + } + + private ObjectIdentifier resolveIdentifier( + SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private ResolvedCatalogMaterializedTable getResolvedMaterializedTable( + ConvertContext context, ObjectIdentifier identifier) { + ResolvedCatalogBaseTable<?> baseTable = + context.getCatalogManager().getTableOrError(identifier).getResolvedTable(); + if (MATERIALIZED_TABLE != baseTable.getTableKind()) { + throw new ValidationException( + "Only materialized table support modify definition query."); + } + return (ResolvedCatalogMaterializedTable) baseTable; + } + + private CatalogMaterializedTable buildUpdatedMaterializedTable( + ResolvedCatalogMaterializedTable oldTable, + List<Column> addedColumns, + String definitionQuery) { + + Schema.Builder newSchemaBuilder = + Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema()); + addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), col.getDataType())); + + return CatalogMaterializedTable.newBuilder() + .schema(newSchemaBuilder.build()) + .comment(oldTable.getComment()) + .partitionKeys(oldTable.getPartitionKeys()) + .options(oldTable.getOptions()) + .definitionQuery(definitionQuery) + .freshness(oldTable.getDefinitionFreshness()) + .logicalRefreshMode(oldTable.getLogicalRefreshMode()) + .refreshMode(oldTable.getRefreshMode()) + .refreshStatus(oldTable.getRefreshStatus()) + .refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(oldTable.getSerializedRefreshHandler()) + .build(); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + int originalColumnSize = oldSchema.getColumns().size(); + int newColumnSize = newSchema.getColumns().size(); + + if (originalColumnSize > newColumnSize) { + throw new ValidationException( + String.format( + "Query modification failed: Column deletion is not allowed. " Review Comment: Failed to modify query because drop column is unsupported. When modifying a query, you can only append new columns at the end of original schema. The original schema has %d columns, but the newly derived schema from the query has %d columns. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java: ########## @@ -99,9 +88,13 @@ private String toString(TableChange.MaterializedTableChange tableChange) { return String.format( " MODIFY REFRESH HANDLER DESCRIPTION TO '%s'", refreshHandler.getRefreshHandlerDesc()); + } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) { + TableChange.ModifyDefinitionQuery definitionQuery = + (TableChange.ModifyDefinitionQuery) tableChange; + return String.format( + " MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery()); } else { - throw new UnsupportedOperationException( - String.format("Unknown materialized table change: %s.", tableChange)); + return tableChange.toString(); Review Comment: I think if the MaterializedTableChange subclass also is TableChange subclass, we can utilize the AlterTableChangeOperation.toString method to print it. Otherwise, throw the UnsupportedOperationException ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL + // does not contain LATERAL which is problematic, + // the issue was resolved in CALCITE-4077 + // (always treat the table function as implicitly LATERAL). + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validatedQuery).project(), () -> originalQuery); + + ResolvedCatalogMaterializedTable oldTable = + getResolvedMaterializedTable(context, identifier); + List<Column> addedColumns = + validateAndExtractNewColumns( + oldTable.getResolvedSchema(), queryOperation.getResolvedSchema()); + + // Build new materialized table and apply changes + CatalogMaterializedTable updatedTable = + buildUpdatedMaterializedTable(oldTable, addedColumns, definitionQuery); + List<MaterializedTableChange> tableChanges = new ArrayList<>(); + addedColumns.forEach(column -> tableChanges.add(TableChange.add(column))); + tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery)); + + return new AlterMaterializedTableAsQueryOperation(identifier, tableChanges, updatedTable); + } + + private ObjectIdentifier resolveIdentifier( + SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private ResolvedCatalogMaterializedTable getResolvedMaterializedTable( + ConvertContext context, ObjectIdentifier identifier) { + ResolvedCatalogBaseTable<?> baseTable = + context.getCatalogManager().getTableOrError(identifier).getResolvedTable(); + if (MATERIALIZED_TABLE != baseTable.getTableKind()) { + throw new ValidationException( + "Only materialized table support modify definition query."); + } + return (ResolvedCatalogMaterializedTable) baseTable; + } + + private CatalogMaterializedTable buildUpdatedMaterializedTable( + ResolvedCatalogMaterializedTable oldTable, + List<Column> addedColumns, + String definitionQuery) { + + Schema.Builder newSchemaBuilder = + Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema()); + addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), col.getDataType())); + + return CatalogMaterializedTable.newBuilder() + .schema(newSchemaBuilder.build()) + .comment(oldTable.getComment()) + .partitionKeys(oldTable.getPartitionKeys()) + .options(oldTable.getOptions()) + .definitionQuery(definitionQuery) + .freshness(oldTable.getDefinitionFreshness()) + .logicalRefreshMode(oldTable.getLogicalRefreshMode()) + .refreshMode(oldTable.getRefreshMode()) + .refreshStatus(oldTable.getRefreshStatus()) + .refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(oldTable.getSerializedRefreshHandler()) + .build(); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + int originalColumnSize = oldSchema.getColumns().size(); + int newColumnSize = newSchema.getColumns().size(); + + if (originalColumnSize > newColumnSize) { + throw new ValidationException( + String.format( + "Query modification failed: Column deletion is not allowed. " + + "When modifying a query, you can only append new columns at the end. " + + "Original schema has %d columns, but new schema has %d columns.", + originalColumnSize, newColumnSize)); + } + + for (int i = 0; i < oldSchema.getColumns().size(); i++) { + Column oldColumn = oldSchema.getColumns().get(i); + Column newColumn = newSchema.getColumns().get(i); + if (!oldColumn.equals(newColumn)) { + throw new ValidationException( + String.format( + "When modifying the query of a materialized table, currently only support appending columns at the end of the schema, dropping, renaming and reordering columns are not supported.\n" Review Comment: ```suggestion "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" ``` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java: ########## @@ -391,6 +451,127 @@ void testAlterMaterializedTableResume() { .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + @Test + void testAlterMaterializedTableAsQuery() throws TableNotExistException { + String sql = + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t3"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class); + + AlterMaterializedTableAsQueryOperation op = + (AlterMaterializedTableAsQueryOperation) operation; + assertThat(op.getTableChanges()) + .isEqualTo( + Arrays.asList( + TableChange.add( + Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add( + Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"); + + // new table only difference schema & definition query with old table. + CatalogMaterializedTable oldTable = + (CatalogMaterializedTable) + catalog.getTable( + new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl")); + CatalogMaterializedTable newTable = op.getNewMaterializedTable(); + + assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); + assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) + .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); + assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) + .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); + assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + + List<Schema.UnresolvedColumn> addedColumn = + newTable.getUnresolvedSchema().getColumns().stream() + .filter( + column -> + !oldTable.getUnresolvedSchema() + .getColumns() + .contains(column)) + .collect(Collectors.toList()); + // added column should be a nullable column. + assertThat(addedColumn) + .isEqualTo( + Arrays.asList( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE)))); + } + + @Test + void testAlterMaterializedTableAsQueryWithConflictColumnName() { + String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, c as a FROM t3"; Review Comment: In next pr, we should forbid the duplicated name case explicitly. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +809,160 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // directly apply the alter operation + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == oldMaterializedTable.getRefreshStatus()) { + // 1. suspend the materialized table + CatalogMaterializedTable suspendMaterializedTable = + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + + // 2. alter materialized table schema & query definition + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + suspendMaterializedTable.getRefreshStatus(), + suspendMaterializedTable + .getRefreshHandlerDescription() + .orElse(null), + suspendMaterializedTable.getSerializedRefreshHandler()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), updatedMaterializedTable); + operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // Roll back the changes to the materialized table and restore the continuous + // refresh job + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter materialized table as query operation.", + tableIdentifier, + e); + + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + suspendMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + ContinuousRefreshHandler continuousRefreshHandler = Review Comment: I think the ContinuousRefreshHandler should come from `suspendMaterializedTable` or `updatedMaterializedTable` becasue we should resume the refresh job savepoint. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +809,160 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { Review Comment: I think we forgot one important logic: explain the select query and ensure it can compile successfully in the corresponding refresh mode when creating and modifying a query. We should do more pre-check actions before creating or altering a query. Can you help create one Jira issue to track it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
