This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9260b33cc5109c6dd2042a8f0ce645b6d56d4d57 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Sun Jan 5 20:05:38 2025 +0800 [FLINK-36994][table] Support converting ALTER MATERIALIZED TABLE AS node to operation --- .../AlterMaterializedTableAsQueryOperation.java | 66 +++++++++ .../AlterMaterializedTableChangeOperation.java | 28 ++-- .../apache/flink/table/catalog/TableChange.java | 93 ++++++++++-- .../SqlAlterMaterializedTableAsQueryConverter.java | 161 +++++++++++++++++++++ .../operations/converters/SqlNodeConverters.java | 1 + ...erializedTableNodeToOperationConverterTest.java | 143 ++++++++++++++++++ 6 files changed, 461 insertions(+), 31 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java new file mode 100644 index 00000000000..331e8cd0220 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; + +import java.util.List; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + private final List<MaterializedTableChange> tableChanges; + + private final CatalogMaterializedTable newMaterializedTable; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, + List<MaterializedTableChange> tableChanges, + CatalogMaterializedTable newMaterializedTable) { + super(tableIdentifier); + this.tableChanges = tableChanges; + this.newMaterializedTable = newMaterializedTable; + } + + public List<MaterializedTableChange> getTableChanges() { + return tableChanges; + } + + public CatalogMaterializedTable getNewMaterializedTable() { + return newMaterializedTable; + } + + @Override + public TableResultInternal execute(Context ctx) { + throw new UnsupportedOperationException( + "AlterMaterializedTableAsQueryOperation doesn't support ExecutableOperation yet."); + } + + @Override + public String asSummaryString() { + return String.format( + "ALTER MATERIALIZED TABLE %s AS %s", + tableIdentifier.asSummaryString(), newMaterializedTable.getDefinitionQuery()); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index 49f220a8ddc..d71946c7a56 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; import java.util.List; @@ -35,19 +36,19 @@ import java.util.stream.Collectors; @Internal public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation { - private final List<TableChange> tableChanges; + private final List<MaterializedTableChange> tableChanges; private final CatalogMaterializedTable catalogMaterializedTable; public AlterMaterializedTableChangeOperation( ObjectIdentifier tableIdentifier, - List<TableChange> tableChanges, + List<MaterializedTableChange> tableChanges, CatalogMaterializedTable catalogMaterializedTable) { super(tableIdentifier); this.tableChanges = tableChanges; this.catalogMaterializedTable = catalogMaterializedTable; } - public List<TableChange> getTableChanges() { + public List<MaterializedTableChange> getTableChanges() { return tableChanges; } @@ -72,22 +73,13 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl public String asSummaryString() { String changes = tableChanges.stream() - .map( - tableChange -> { - if (tableChange - instanceof TableChange.MaterializedTableChange) { - return toString( - (TableChange.MaterializedTableChange) tableChange); - } else { - return AlterTableChangeOperation.toString(tableChange); - } - }) + .map(AlterMaterializedTableChangeOperation::toString) .collect(Collectors.joining(",\n")); return String.format( "ALTER MATERIALIZED TABLE %s\n%s", tableIdentifier.asSummaryString(), changes); } - private String toString(TableChange.MaterializedTableChange tableChange) { + private static String toString(MaterializedTableChange tableChange) { if (tableChange instanceof TableChange.ModifyRefreshStatus) { TableChange.ModifyRefreshStatus refreshStatus = (TableChange.ModifyRefreshStatus) tableChange; @@ -99,9 +91,13 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl return String.format( " MODIFY REFRESH HANDLER DESCRIPTION TO '%s'", refreshHandler.getRefreshHandlerDesc()); + } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) { + TableChange.ModifyDefinitionQuery definitionQuery = + (TableChange.ModifyDefinitionQuery) tableChange; + return String.format( + " MODIFY DEFINITION QUERY TO '%s'", definitionQuery.getDefinitionQuery()); } else { - throw new UnsupportedOperationException( - String.format("Unknown materialized table change: %s.", tableChange)); + return AlterTableChangeOperation.toString(tableChange); } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java index f5531f3bcf3..76ac98bd38d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java @@ -28,7 +28,7 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Objects; -/** {@link TableChange} represents the modification of the table. */ +/** {@link TableChange} represents the modification of the {@link CatalogBaseTable}. */ @PublicEvolving public interface TableChange { @@ -380,6 +380,16 @@ public interface TableChange { return new ModifyRefreshHandler(refreshHandlerDesc, refreshHandlerBytes); } + /** + * A table change to modify materialized table definition query. + * + * @param definitionQuery the modified definition query. + * @return a TableChange represents the modification. + */ + static ModifyDefinitionQuery modifyDefinitionQuery(String definitionQuery) { + return new ModifyDefinitionQuery(definitionQuery); + } + // -------------------------------------------------------------------------------------------- // Add Change // -------------------------------------------------------------------------------------------- @@ -392,9 +402,13 @@ public interface TableChange { * <pre> * ALTER TABLE <table_name> ADD <column_definition> <column_position> * </pre> + * + * <p>Note: An <code>ALTER MATERIALIZED TABLE AS QUERY</code> operation may also produce an + * <code>AddColumn</code> change. This occurs when the materialized table's schema is updated to + * align with the structure of the query results, which might require adding new columns. */ @PublicEvolving - class AddColumn implements TableChange { + class AddColumn implements CatalogTableChange, MaterializedTableChange { private final Column column; private final ColumnPosition position; @@ -447,7 +461,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class AddUniqueConstraint implements TableChange { + class AddUniqueConstraint implements CatalogTableChange { private final UniqueConstraint constraint; @@ -493,7 +507,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class AddDistribution implements TableChange { + class AddDistribution implements CatalogTableChange { private final TableDistribution distribution; @@ -539,7 +553,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class AddWatermark implements TableChange { + class AddWatermark implements CatalogTableChange { private final WatermarkSpec watermarkSpec; @@ -601,7 +615,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class ModifyColumn implements TableChange { + class ModifyColumn implements CatalogTableChange { protected final Column oldColumn; protected final Column newColumn; @@ -848,7 +862,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class ModifyUniqueConstraint implements TableChange { + class ModifyUniqueConstraint implements CatalogTableChange { private final UniqueConstraint newConstraint; @@ -894,7 +908,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class ModifyDistribution implements TableChange { + class ModifyDistribution implements CatalogTableChange { private final TableDistribution distribution; @@ -940,7 +954,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class ModifyWatermark implements TableChange { + class ModifyWatermark implements CatalogTableChange { private final WatermarkSpec newWatermark; @@ -988,9 +1002,14 @@ public interface TableChange { * <pre> * ALTER TABLE <table_name> DROP COLUMN <column_name> * </pre> + * + * <p>Note: A <code>DropColumn</code> change may also occur when rolling back the schema during + * a failed <code>ALTER MATERIALIZED TABLE AS QUERY</code> operation. If the operation fails, + * columns added to align with the query results may need to be removed to restore the original + * schema. */ @PublicEvolving - class DropColumn implements TableChange { + class DropColumn implements CatalogTableChange, MaterializedTableChange { private final String columnName; @@ -1036,7 +1055,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class DropWatermark implements TableChange { + class DropWatermark implements CatalogTableChange { static final DropWatermark INSTANCE = new DropWatermark(); @Override @@ -1055,7 +1074,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class DropConstraint implements TableChange { + class DropConstraint implements CatalogTableChange { private final String constraintName; @@ -1101,7 +1120,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class DropDistribution implements TableChange { + class DropDistribution implements CatalogTableChange { static final DropDistribution INSTANCE = new DropDistribution(); @Override @@ -1124,7 +1143,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class SetOption implements TableChange { + class SetOption implements CatalogTableChange { private final String key; private final String value; @@ -1177,7 +1196,7 @@ public interface TableChange { * </pre> */ @PublicEvolving - class ResetOption implements TableChange { + class ResetOption implements CatalogTableChange { private final String key; @@ -1279,6 +1298,13 @@ public interface TableChange { } } + // -------------------------------------------------------------------------------------------- + // Catalog table change + // -------------------------------------------------------------------------------------------- + /** {@link CatalogTableChange} represents the modification of the CatalogTable. */ + @PublicEvolving + interface CatalogTableChange extends TableChange {} + // -------------------------------------------------------------------------------------------- // Materialized table change // -------------------------------------------------------------------------------------------- @@ -1374,4 +1400,41 @@ public interface TableChange { + '}'; } } + + /** A table change to modify the definition query. */ + @PublicEvolving + class ModifyDefinitionQuery implements MaterializedTableChange { + + private final String definitionQuery; + + public ModifyDefinitionQuery(String definitionQuery) { + this.definitionQuery = definitionQuery; + } + + public String getDefinitionQuery() { + return definitionQuery; + } + + @Override + public String toString() { + return "ModifyDefinitionQuery{" + "definitionQuery='" + definitionQuery + '\'' + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ModifyDefinitionQuery that = (ModifyDefinitionQuery) o; + return Objects.equals(definitionQuery, that.definitionQuery); + } + + @Override + public int hashCode() { + return Objects.hash(definitionQuery); + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java new file mode 100644 index 00000000000..048e2e4d753 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL + // does not contain LATERAL which is problematic, + // the issue was resolved in CALCITE-4077 + // (always treat the table function as implicitly LATERAL). + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validatedQuery).project(), () -> originalQuery); + + ResolvedCatalogMaterializedTable oldTable = + getResolvedMaterializedTable(context, identifier); + List<Column> addedColumns = + validateAndExtractNewColumns( + oldTable.getResolvedSchema(), queryOperation.getResolvedSchema()); + + // Build new materialized table and apply changes + CatalogMaterializedTable updatedTable = + buildUpdatedMaterializedTable(oldTable, addedColumns, definitionQuery); + List<MaterializedTableChange> tableChanges = new ArrayList<>(); + addedColumns.forEach(column -> tableChanges.add(TableChange.add(column))); + tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery)); + + return new AlterMaterializedTableAsQueryOperation(identifier, tableChanges, updatedTable); + } + + private ObjectIdentifier resolveIdentifier( + SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private ResolvedCatalogMaterializedTable getResolvedMaterializedTable( + ConvertContext context, ObjectIdentifier identifier) { + ResolvedCatalogBaseTable<?> baseTable = + context.getCatalogManager().getTableOrError(identifier).getResolvedTable(); + if (MATERIALIZED_TABLE != baseTable.getTableKind()) { + throw new ValidationException( + "Only materialized table support modify definition query."); + } + return (ResolvedCatalogMaterializedTable) baseTable; + } + + private CatalogMaterializedTable buildUpdatedMaterializedTable( + ResolvedCatalogMaterializedTable oldTable, + List<Column> addedColumns, + String definitionQuery) { + + Schema.Builder newSchemaBuilder = + Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema()); + addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), col.getDataType())); + + return CatalogMaterializedTable.newBuilder() + .schema(newSchemaBuilder.build()) + .comment(oldTable.getComment()) + .partitionKeys(oldTable.getPartitionKeys()) + .options(oldTable.getOptions()) + .definitionQuery(definitionQuery) + .freshness(oldTable.getDefinitionFreshness()) + .logicalRefreshMode(oldTable.getLogicalRefreshMode()) + .refreshMode(oldTable.getRefreshMode()) + .refreshStatus(oldTable.getRefreshStatus()) + .refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(oldTable.getSerializedRefreshHandler()) + .build(); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + int originalColumnSize = oldSchema.getColumns().size(); + int newColumnSize = newSchema.getColumns().size(); + + if (originalColumnSize > newColumnSize) { + throw new ValidationException( + String.format( + "Failed to modify query because drop column is unsupported. " + + "When modifying a query, you can only append new columns at the end of original schema. " + + "The original schema has %d columns, but the newly derived schema from the query has %d columns.", + originalColumnSize, newColumnSize)); + } + + for (int i = 0; i < oldSchema.getColumns().size(); i++) { + Column oldColumn = oldSchema.getColumns().get(i); + Column newColumn = newSchema.getColumns().get(i); + if (!oldColumn.equals(newColumn)) { + throw new ValidationException( + String.format( + "When modifying the query of a materialized table, " + + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", + i, oldColumn, newColumn)); + } + } + + for (int i = oldSchema.getColumns().size(); i < newSchema.getColumns().size(); i++) { + Column newColumn = newSchema.getColumns().get(i); + newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable())); + } + + return newAddedColumns; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 25ab375469f..87570ee9c3c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -63,6 +63,7 @@ public class SqlNodeConverters { register(new SqlAlterMaterializedTableRefreshConverter()); register(new SqlAlterMaterializedTableSuspendConverter()); register(new SqlAlterMaterializedTableResumeConverter()); + register(new SqlAlterMaterializedTableAsQueryConverter()); register(new SqlDropMaterializedTableConverter()); register(new SqlShowTablesConverter()); register(new SqlShowViewsConverter()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index ff3cf0c0849..a3b659f335b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; @@ -45,7 +48,9 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -72,6 +77,25 @@ public class SqlMaterializedTableNodeToOperationConverterTest final CatalogTable catalogTable = CatalogTable.of(tableSchema, "", Arrays.asList("b", "c"), options); catalog.createTable(path3, catalogTable, true); + + // create materialized table + final String sql = + "CREATE MATERIALIZED TABLE base_mtbl (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + final ObjectPath path4 = new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"); + + CreateMaterializedTableOperation operation = (CreateMaterializedTableOperation) parse(sql); + catalog.createTable(path4, operation.getCatalogMaterializedTable(), true); } @Test @@ -391,6 +415,125 @@ public class SqlMaterializedTableNodeToOperationConverterTest .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + @Test + void testAlterMaterializedTableAsQuery() throws TableNotExistException { + String sql = + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t3"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class); + + AlterMaterializedTableAsQueryOperation op = + (AlterMaterializedTableAsQueryOperation) operation; + assertThat(op.getTableChanges()) + .isEqualTo( + Arrays.asList( + TableChange.add( + Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add( + Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3`"); + + // new table only difference schema & definition query with old table. + CatalogMaterializedTable oldTable = + (CatalogMaterializedTable) + catalog.getTable( + new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl")); + CatalogMaterializedTable newTable = op.getNewMaterializedTable(); + + assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); + assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) + .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); + assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) + .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); + assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + + List<Schema.UnresolvedColumn> addedColumn = + newTable.getUnresolvedSchema().getColumns().stream() + .filter( + column -> + !oldTable.getUnresolvedSchema() + .getColumns() + .contains(column)) + .collect(Collectors.toList()); + // added column should be a nullable column. + assertThat(addedColumn) + .isEqualTo( + Arrays.asList( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE)))); + } + + @Test + void testAlterMaterializedTableAsQueryWithConflictColumnName() { + String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, c as a FROM t3"; + AlterMaterializedTableAsQueryOperation sqlAlterMaterializedTableAsQuery = + (AlterMaterializedTableAsQueryOperation) parse(sql5); + + assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) + .isEqualTo( + Arrays.asList( + TableChange.add(Column.physical("a0", DataTypes.INT())), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`c` AS `a`\n" + + "FROM `builtin`.`default`.`t3`"))); + } + + @Test + void testAlterMaterializedTableAsQueryWithUnsupportedColumnChange() { + // 1. delete existing column + String sql1 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM t3"; + assertThatThrownBy(() -> parse(sql1)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Failed to modify query because drop column is unsupported. When modifying a query, you can only append new columns at the end of original schema. The original schema has 4 columns, but the newly derived schema from the query has 2 columns."); + // 2. swap column position + String sql2 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c FROM t3"; + assertThatThrownBy(() -> parse(sql2)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 2: Original column is [`c` INT], but new column is [`d` STRING]."); + // 3. change existing column type + String sql3 = + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast(d as int) as d FROM t3"; + assertThatThrownBy(() -> parse(sql3)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 3: Original column is [`d` STRING], but new column is [`d` INT]."); + // 4. change existing column nullability + String sql4 = + "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast('d' as string) as d FROM t3"; + assertThatThrownBy(() -> parse(sql4)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "When modifying the query of a materialized table, currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position 3: Original column is [`d` STRING], but new column is [`d` STRING NOT NULL]."); + } + + @Test + void testAlterAlterMaterializedTableAsQueryWithCatalogTable() { + // t1 is a CatalogTable not a Materialized Table + final String sql = "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1"; + assertThatThrownBy(() -> parse(sql)) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Only materialized table support modify definition query."); + } + @Test void testDropMaterializedTable() { final String sql = "DROP MATERIALIZED TABLE mtbl1";