This is an automated email from the ASF dual-hosted git repository. AHeise pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 131d7c69018ed144f011c855dd2075a3f355f05b Author: Arvid Heise <[email protected]> AuthorDate: Mon May 18 21:12:17 2026 +0200 [FLINK-39700][table-api-java] Consolidate query-change and drop validation into AlterMaterializedTableChangeOperation Validation that was scattered across MaterializedTableChangeHandler (checkForChangedPositionByQuery, droppedPersistedCnt, validationErrors) and SqlAlterMaterializedTableDropSchemaConverter is consolidated into AlterMaterializedTableChangeOperation.validateChanges(). The handler becomes a pure applier: it throws immediately on unknown changes and applies DropColumn unconditionally. Same errors are thrown, just from the right layer. Tests for both layers (AlterMaterializedTableChangeOperationValidationTest, AlterMaterializedTableAsQueryOperationValidationTest, MaterializedTableChangeHandlerTest) are added alongside the prod change. --- .../AlterMaterializedTableChangeOperation.java | 122 ++++++++++++- .../MaterializedTableChangeHandler.java | 150 +--------------- ...ializedTableAsQueryOperationValidationTest.java | 199 +++++++++++++++++++++ ...rializedTableChangeOperationValidationTest.java | 189 +++++++++++++++++++ .../MaterializedTableChangeHandlerTest.java | 195 ++++++++++++++++++++ ...lAlterMaterializedTableDropSchemaConverter.java | 9 - ...erializedTableNodeToOperationConverterTest.java | 10 +- 7 files changed, 716 insertions(+), 158 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index 13a3019a5c7..2961dc88a10 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -18,22 +18,34 @@ package org.apache.flink.table.operations.materializedtable; +import org.apache.flink.annotation.Confluent; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.After; +import org.apache.flink.table.catalog.TableChange.ColumnPosition; +import org.apache.flink.table.catalog.TableChange.DropColumn; +import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition; import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery; +import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType; import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler; import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus; import org.apache.flink.table.catalog.TableChange.ModifyStartMode; import org.apache.flink.table.operations.ddl.AlterTableChangeOperation; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * Alter materialized table with new table definition and table changes represents the modification. @@ -42,7 +54,7 @@ import java.util.stream.Collectors; public class AlterMaterializedTableChangeOperation extends AlterMaterializedTableOperation { private final Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangeForTable; - private final ResolvedCatalogMaterializedTable oldTable; + private ResolvedCatalogMaterializedTable oldTable; private MaterializedTableChangeHandler handler; private CatalogMaterializedTable newTable; private List<TableChange> tableChanges; @@ -77,6 +89,17 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl return newTable; } + @Confluent + public ResolvedCatalogMaterializedTable getOldTable() { + return oldTable; + } + + @Confluent + public void setOldTable(final ResolvedCatalogMaterializedTable oldTable) { + this.oldTable = oldTable; + this.newTable = null; + } + protected MaterializedTableChangeHandler getHandlerWithChanges() { if (handler == null) { handler = @@ -86,8 +109,105 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl return handler; } + @VisibleForTesting + public void validateChanges() { + final List<TableChange> changes = getTableChanges(); + final boolean isQueryChange = + changes.stream().anyMatch(ModifyDefinitionQuery.class::isInstance); + final List<Column> oldColumns = oldTable.getResolvedSchema().getColumns(); + final Map<String, Integer> columnIndex = + IntStream.range(0, oldColumns.size()) + .boxed() + .collect( + Collectors.toMap( + i -> oldColumns.get(i).getName(), Function.identity())); + final List<String> errors = new ArrayList<>(); + for (final TableChange change : changes) { + if (change instanceof DropColumn) { + checkDroppedColumn((DropColumn) change, oldColumns, columnIndex, errors); + } else if (isQueryChange && change instanceof ModifyColumnPosition) { + checkPositionChange((ModifyColumnPosition) change, oldColumns, columnIndex, errors); + } else if (isQueryChange && change instanceof ModifyPhysicalColumnType) { + checkPhysicalTypeChange( + (ModifyPhysicalColumnType) change, oldColumns, columnIndex, errors); + } + } + if (!errors.isEmpty()) { + throw new ValidationException(String.join("\n", errors)); + } + } + + private void checkDroppedColumn( + DropColumn change, + List<Column> oldColumns, + Map<String, Integer> columnIndex, + List<String> errors) { + final Integer idx = columnIndex.get(change.getColumnName()); + if (idx != null && oldColumns.get(idx).isPersisted()) { + errors.add( + String.format( + "Dropping of persisted column `%s` is not supported.", + change.getColumnName())); + } + } + + private void checkPositionChange( + ModifyColumnPosition change, + List<Column> oldColumns, + Map<String, Integer> columnIndex, + List<String> errors) { + final int index = displacedColumnIndex(change.getNewPosition(), columnIndex); + if (index < 0) { + return; + } + errors.add( + positionChangeError( + oldColumns.get(index).asSummaryString(), + change.getNewColumn().asSummaryString(), + index)); + } + + private void checkPhysicalTypeChange( + ModifyPhysicalColumnType change, + List<Column> oldColumns, + Map<String, Integer> columnIndex, + List<String> errors) { + final Integer idx = columnIndex.get(change.getOldColumn().getName()); + if (idx != null) { + errors.add( + positionChangeError( + change.getOldColumn().asSummaryString(), + change.getNewColumn().asSummaryString(), + idx)); + } + } + + /** + * Returns the index in oldColumns of the column displaced by this position change, or -1 if + * positioned after a column absent from the old schema (i.e., appended after a new column). + */ + private static int displacedColumnIndex( + ColumnPosition position, Map<String, Integer> columnIndex) { + if (position == ColumnPosition.first()) { + return 0; + } + final Integer afterIdx = columnIndex.get(((After) position).column()); + // afterIdx == null → after a new column not in the old schema → treat as append + // afterIdx == last → after the last old column → also treat as append (no displacement) + return afterIdx != null && afterIdx < columnIndex.size() - 1 ? afterIdx + 1 : -1; + } + + private static String positionChangeError(String oldColumn, String newColumn, int position) { + return String.format( + "When modifying the query of a materialized table, " + + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", + position + 1, oldColumn, newColumn); + } + @Override public TableResultInternal execute(Context ctx) { + validateChanges(); ctx.getCatalogManager() .alterTable(getNewTable(), getTableChanges(), getTableIdentifier(), false); return TableResultImpl.TABLE_RESULT_OK; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java index 54a55414a8e..7521c94a791 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java @@ -58,7 +58,6 @@ import org.apache.flink.table.types.DataType; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.LinkedList; @@ -66,16 +65,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; -import java.util.stream.Collectors; -/** Applying table changes to old materialized table and gathering validation errors. */ +/** Applies table changes to a materialized table and builds the resulting definition. */ @Internal public class MaterializedTableChangeHandler { private static final HandlerRegistry HANDLER_REGISTRY = createHandlerRegistry(); private final List<UnresolvedColumn> columns; private final CatalogMaterializedTable oldTable; - private boolean isQueryChange; private @Nullable TableDistribution distribution; private CatalogMaterializedTable.RefreshStatus refreshStatus; private @Nullable String refreshHandlerDesc; @@ -83,12 +80,10 @@ public class MaterializedTableChangeHandler { private List<UnresolvedWatermarkSpec> watermarkSpecs; private String primaryKeyName = null; private List<String> primaryKeyColumns = null; - private int droppedPersistedCnt = 0; private String originalQuery; private String expandedQuery; private StartMode startMode; private final Map<String, String> options; - private final List<String> validationErrors = new ArrayList<>(); public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) { this.distribution = oldTable.getDistribution().orElse(null); @@ -122,10 +117,10 @@ public class MaterializedTableChangeHandler { private void apply(MaterializedTableChangeHandler context, TableChange change) { HandlerRegistry.HandlerWrapper<?> wrapper = HANDLERS.get(change.getClass()); if (wrapper == null) { - context.validationErrors.add("Unknown table change " + change.getClass()); - } else { - wrapper.accept(context, change); + throw new ValidationException( + "Unsupported table change: " + change.getClass().getName()); } + wrapper.accept(context, change); } private static final class HandlerWrapper<T extends TableChange> { @@ -141,24 +136,15 @@ public class MaterializedTableChangeHandler { } } - public List<String> getValidationErrors() { - return List.copyOf(validationErrors); - } - public static MaterializedTableChangeHandler getHandlerWithChanges( CatalogMaterializedTable oldTable, List<TableChange> tableChanges) { - MaterializedTableChangeHandler handler = new MaterializedTableChangeHandler(oldTable); + final MaterializedTableChangeHandler handler = new MaterializedTableChangeHandler(oldTable); handler.applyTableChanges(tableChanges); return handler; } public static CatalogMaterializedTable buildNewMaterializedTable( MaterializedTableChangeHandler context) { - final List<String> validationErrors = context.getValidationErrors(); - if (!validationErrors.isEmpty()) { - throw new ValidationException(String.join("\n", validationErrors)); - } - final CatalogMaterializedTable oldTable = context.getOldTable(); return CatalogMaterializedTable.newBuilder() .schema(context.retrieveSchema()) @@ -217,6 +203,9 @@ public class MaterializedTableChangeHandler { registry.register( ModifyRefreshStatus.class, MaterializedTableChangeHandler::modifyRefreshStatus); + // Start mode + registry.register(ModifyStartMode.class, MaterializedTableChangeHandler::modifyStartMode); + // Distribution operations registry.register(AddDistribution.class, MaterializedTableChangeHandler::addDistribution); registry.register( @@ -233,25 +222,9 @@ public class MaterializedTableChangeHandler { } void applyTableChanges(List<TableChange> tableChanges) { - isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof ModifyDefinitionQuery); - Schema oldSchema = oldTable.getUnresolvedSchema(); - if (isQueryChange) { - checkForChangedPositionByQuery(tableChanges, oldSchema); - } - for (TableChange tableChange : tableChanges) { HANDLER_REGISTRY.apply(this, tableChange); } - - if (droppedPersistedCnt > 0 && isQueryChange) { - final int schemaSize = oldSchema.getColumns().size(); - validationErrors.add( - String.format( - "Failed to modify query because drop column is unsupported. " - + "When modifying a query, you can only append new columns at the end of original schema. " - + "The original schema has %d columns, but the newly derived schema from the query has %d columns.", - schemaSize, schemaSize - droppedPersistedCnt)); - } } public Schema retrieveSchema() { @@ -324,15 +297,7 @@ public class MaterializedTableChangeHandler { } private void dropColumn(TableChange.DropColumn dropColumn) { - String droppedColumnName = dropColumn.getColumnName(); - int index = getColumnIndex(droppedColumnName); - UnresolvedColumn column = columns.get(index); - if (isQueryChange && isNonPersistedColumn(column)) { - // noop - } else { - columns.remove(index); - droppedPersistedCnt++; - } + columns.remove(getColumnIndex(dropColumn.getColumnName())); } private void modifyPhysicalColumnType(ModifyPhysicalColumnType modifyPhysicalColumnType) { @@ -350,15 +315,6 @@ public class MaterializedTableChangeHandler { private void modifyColumnPosition(ModifyColumnPosition columnWithChangedPosition) { Column column = columnWithChangedPosition.getOldColumn(); int oldPosition = getColumnIndex(column.getName()); - if (isQueryChange) { - validationErrors.add( - String.format( - "When modifying the query of a materialized table, " - + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", - oldPosition + 1, column, column)); - } - ColumnPosition position = columnWithChangedPosition.getNewPosition(); UnresolvedColumn changedPositionColumn = columns.get(oldPosition); columns.remove(oldPosition); @@ -370,12 +326,6 @@ public class MaterializedTableChangeHandler { originalQuery = queryChange.getOriginalQuery(); } - private boolean isNonPersistedColumn(UnresolvedColumn column) { - return column instanceof Schema.UnresolvedComputedColumn - || column instanceof Schema.UnresolvedMetadataColumn - && ((Schema.UnresolvedMetadataColumn) column).isVirtual(); - } - private void addUniqueConstraint(AddUniqueConstraint addUniqueConstraint) { final UniqueConstraint constraint = addUniqueConstraint.getConstraint(); primaryKeyName = constraint.getName(); @@ -461,88 +411,6 @@ public class MaterializedTableChangeHandler { } } - private void checkForChangedPositionByQuery(List<TableChange> tableChanges, Schema oldSchema) { - List<ModifyColumnPosition> positionChanges = - tableChanges.stream() - .filter(t -> t instanceof ModifyColumnPosition) - .map(t -> (ModifyColumnPosition) t) - .collect(Collectors.toList()); - - List<ModifyPhysicalColumnType> physicalTypeChanges = - tableChanges.stream() - .filter(t -> t instanceof ModifyPhysicalColumnType) - .map(t -> (ModifyPhysicalColumnType) t) - .collect(Collectors.toList()); - - if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) { - return; - } - - int persistedColumnOffset = 0; - List<UnresolvedColumn> oldColumns = oldSchema.getColumns(); - for (UnresolvedColumn column : oldColumns) { - if (!isNonPersistedColumn(column)) { - persistedColumnOffset++; - } - } - - Map<String, Column> afterToColumnName = new HashMap<>(); - for (ModifyColumnPosition change : positionChanges) { - final ColumnPosition position = change.getNewPosition(); - final Column newColumn = change.getNewColumn(); - if (position == ColumnPosition.first()) { - if (persistedColumnOffset == 0) { - positionChangeError( - newColumn.asSummaryString(), - oldColumns.get(persistedColumnOffset).toString(), - persistedColumnOffset); - } else { - afterToColumnName.put( - oldColumns.get(persistedColumnOffset).getName(), newColumn); - } - } else { - afterToColumnName.put(((After) position).column(), newColumn); - } - } - - for (int i = 0; i < oldColumns.size() - 1; i++) { - Column newColumn = afterToColumnName.get(oldColumns.get(i).getName()); - if (newColumn != null) { - positionChangeError(oldColumns.get(i + 1), newColumn, i + 1); - } - } - - Map<String, Integer> nameToIndex = new HashMap<>(); - for (int i = 0; i < oldColumns.size(); i++) { - UnresolvedColumn column = oldColumns.get(i); - if (!isNonPersistedColumn(column)) { - nameToIndex.put(column.getName(), i); - } - } - - for (ModifyPhysicalColumnType change : physicalTypeChanges) { - final int index = nameToIndex.get(change.getOldColumn().getName()); - positionChangeError(change.getOldColumn(), change.getNewColumn(), index); - } - } - - private void positionChangeError(UnresolvedColumn oldColumn, Column newColumn, int position) { - positionChangeError(oldColumn.toString(), newColumn.asSummaryString(), position); - } - - private void positionChangeError(Column oldColumn, Column newColumn, int position) { - positionChangeError(oldColumn.asSummaryString(), newColumn.asSummaryString(), position); - } - - private void positionChangeError(String oldColumn, String newColumn, int position) { - validationErrors.add( - String.format( - "When modifying the query of a materialized table, " - + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", - position + 1, oldColumn, newColumn)); - } - private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition position) { if (position == null) { columns.add(column); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java new file mode 100644 index 00000000000..8a513278bf7 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.ColumnPosition; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.apache.flink.table.catalog.Column.computed; +import static org.apache.flink.table.catalog.Column.physical; +import static org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperationValidationTest.resolvedTable; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for validation of {@link AlterMaterializedTableAsQueryOperation} (ALTER MATERIALIZED TABLE + * ... AS <query>). CREATE OR ALTER goes through {@link FullAlterMaterializedTableOperation} + * and is exercised separately by the planner integration tests. + */ +class AlterMaterializedTableAsQueryOperationValidationTest { + + private static final TableChange QUERY_CHANGE = + TableChange.modifyDefinitionQuery( + "SELECT a, b FROM src", "SELECT `src`.`a`, `src`.`b` FROM `src`"); + + @Test + void rejectDropPersistedColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of( + TableChange.modifyDefinitionQuery( + "SELECT b FROM src", "SELECT `src`.`b` FROM `src`"), + TableChange.dropColumn("a"))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Dropping of persisted column `a` is not supported."); + } + + @Test + void acceptDropNonPersistedColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), + computed( + "comp", + new ResolvedExpressionMock( + DataTypes.STRING(), () -> "UPPER(a)")))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of( + TableChange.modifyDefinitionQuery( + "SELECT a FROM src2", "SELECT `src2`.`a` FROM `src2`"), + TableChange.dropColumn("comp"))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void rejectReorderColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), + physical("b", DataTypes.STRING()), + physical("c", DataTypes.BIGINT()))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of( + TableChange.modifyDefinitionQuery( + "SELECT a, c, b FROM src", + "SELECT `src`.`a`, `src`.`c`, `src`.`b` FROM `src`"), + TableChange.modifyColumnPosition( + physical("c", DataTypes.BIGINT()), + ColumnPosition.after("a")))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Column mismatch at position 2: Original column is [`b` STRING], but new column is [`c` BIGINT]."); + } + + @Test + void rejectReorderColumnToFirst() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of( + TableChange.modifyDefinitionQuery( + "SELECT b, a FROM src", + "SELECT `src`.`b`, `src`.`a` FROM `src`"), + TableChange.modifyColumnPosition( + physical("b", DataTypes.STRING()), + ColumnPosition.first()))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Column mismatch at position 1: Original column is [`a` INT], but new column is [`b` STRING]."); + } + + @Test + void rejectTypeChange() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of( + TableChange.modifyDefinitionQuery( + "SELECT CAST(a AS BIGINT), b FROM src", + "SELECT CAST(`src`.`a` AS BIGINT), `src`.`b` FROM `src`"), + TableChange.modifyPhysicalColumnType( + physical("a", DataTypes.INT()), DataTypes.BIGINT()))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining( + "Column mismatch at position 1: Original column is [`a` INT], but new column is [`a` BIGINT]."); + } + + @Test + void acceptAppendColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableAsQueryOperation op = + operation( + oldTable, + List.of(QUERY_CHANGE, TableChange.add(physical("c", DataTypes.BIGINT())))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void acceptQueryOnlyChange() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableAsQueryOperation op = + operation(oldTable, List.of(QUERY_CHANGE)); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + private static AlterMaterializedTableAsQueryOperation operation( + ResolvedCatalogMaterializedTable oldTable, List<TableChange> changes) { + return new AlterMaterializedTableAsQueryOperation( + ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, oldTable); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java new file mode 100644 index 00000000000..5497e19fb7f --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.catalog.Column.computed; +import static org.apache.flink.table.catalog.Column.metadata; +import static org.apache.flink.table.catalog.Column.physical; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Tests for {@link AlterMaterializedTableChangeOperation#validateChanges()} when the operation + * carries explicit DDL changes (no query change). Query-change validation is exercised by {@link + * AlterMaterializedTableAsQueryOperationValidationTest}. + */ +class AlterMaterializedTableChangeOperationValidationTest { + + @Test + void rejectDropPersistedColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.dropColumn("a"))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Dropping of persisted column `a` is not supported."); + } + + @Test + void rejectDropPersistedMetadataColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), + metadata("m", DataTypes.STRING(), null, false))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.dropColumn("m"))); + + assertThatThrownBy(op::validateChanges) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Dropping of persisted column `m` is not supported."); + } + + @Test + void acceptDropComputedColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), + computed( + "c", + new ResolvedExpressionMock( + DataTypes.STRING(), () -> "UPPER(a)")))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.dropColumn("c"))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void acceptDropVirtualMetadataColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), + metadata("v", DataTypes.STRING(), null, true))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.dropColumn("v"))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void acceptDropNonExistentColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.dropColumn("does-not-exist"))); + + // No column to validate against; planner-level checks handle this elsewhere. + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void acceptAddColumn() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableChangeOperation op = + operation(oldTable, List.of(TableChange.add(physical("c", DataTypes.BIGINT())))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + @Test + void acceptCommentChange() { + final ResolvedCatalogMaterializedTable oldTable = + resolvedTable( + ResolvedSchema.of( + physical("a", DataTypes.INT()), physical("b", DataTypes.STRING()))); + + final AlterMaterializedTableChangeOperation op = + operation( + oldTable, + List.of( + TableChange.modifyColumnComment( + physical("a", DataTypes.INT()), "new comment"))); + + assertThatNoException().isThrownBy(op::validateChanges); + } + + static ResolvedCatalogMaterializedTable resolvedTable(ResolvedSchema resolvedSchema) { + final CatalogMaterializedTable origin = + CatalogMaterializedTable.newBuilder() + .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) + .comment("") + .partitionKeys(List.of()) + .options(Map.of()) + .originalQuery("SELECT a FROM src") + .expandedQuery("SELECT `src`.`a` FROM `src`") + .freshness(IntervalFreshness.ofSecond(60)) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) + .refreshMode(RefreshMode.CONTINUOUS) + .refreshStatus(RefreshStatus.INITIALIZING) + .startMode(StartMode.of(StartModeKind.FROM_BEGINNING)) + .build(); + return new ResolvedCatalogMaterializedTable( + origin, + resolvedSchema, + RefreshMode.CONTINUOUS, + IntervalFreshness.ofSecond(60), + StartMode.of(StartModeKind.FROM_BEGINNING)); + } + + static AlterMaterializedTableChangeOperation operation( + ResolvedCatalogMaterializedTable oldTable, List<TableChange> changes) { + return new AlterMaterializedTableChangeOperation( + ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, oldTable); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java new file mode 100644 index 00000000000..3c5e4bdbf09 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.StartMode; +import org.apache.flink.table.catalog.StartMode.StartModeKind; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.ColumnPosition; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.type; + +/** + * Tests for {@link MaterializedTableChangeHandler} — pure applier behavior exercised without the + * planner stack. Validation lives in {@link + * AlterMaterializedTableChangeOperation#validateChanges()}; tests there assert what is rejected. + */ +class MaterializedTableChangeHandlerTest { + + /** Default schema used by tests that don't refine it: [a INT, b STRING]. */ + private static final Schema DEFAULT_SCHEMA = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + + private static CatalogMaterializedTable.Builder defaultBuilder() { + return CatalogMaterializedTable.newBuilder() + .schema(DEFAULT_SCHEMA) + .comment("") + .partitionKeys(List.of()) + .options(Map.of()) + .originalQuery("dummy") + .expandedQuery("dummy") + .freshness(IntervalFreshness.ofSecond(60)) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) + .refreshMode(RefreshMode.CONTINUOUS) + .refreshStatus(RefreshStatus.INITIALIZING) + .startMode(StartMode.of(StartModeKind.FROM_BEGINNING)); + } + + @Test + void shouldDropPersistedColumn() { + // old schema: [a INT, b STRING] + final CatalogMaterializedTable newTable = + applyChanges(defaultBuilder().build(), List.of(TableChange.dropColumn("a"))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("b"); + } + + @Test + void shouldDropComputedColumn() { + // old schema: [a INT, comp AS UPPER(a)] + final CatalogMaterializedTable tableWithComputed = + defaultBuilder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .columnByExpression("comp", "UPPER(a)") + .build()) + .build(); + + final CatalogMaterializedTable newTable = + applyChanges(tableWithComputed, List.of(TableChange.dropColumn("comp"))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("a"); + } + + @Test + void shouldDropVirtualMetadataColumn() { + // old schema: [a INT, v STRING METADATA VIRTUAL] + final CatalogMaterializedTable tableWithMetadata = + defaultBuilder() + .schema( + Schema.newBuilder() + .column("a", DataTypes.INT()) + .columnByMetadata("v", DataTypes.STRING(), null, true) + .build()) + .build(); + + final CatalogMaterializedTable newTable = + applyChanges(tableWithMetadata, List.of(TableChange.dropColumn("v"))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("a"); + } + + @Test + void shouldApplyAddColumnAfter() { + // old schema: [a INT, b STRING] + final CatalogMaterializedTable newTable = + applyChanges( + defaultBuilder().build(), + List.of( + TableChange.add( + Column.physical("x", DataTypes.BIGINT()), + ColumnPosition.after("a")))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("a", "x", "b"); + assertThat(newTable.getUnresolvedSchema().getColumns()) + .element(1) + .asInstanceOf(type(Schema.UnresolvedPhysicalColumn.class)) + .returns(DataTypes.BIGINT(), Schema.UnresolvedPhysicalColumn::getDataType); + } + + @Test + void shouldApplyAddComputedColumnFirst() { + // old schema: [a INT, b STRING] + final CatalogMaterializedTable newTable = + applyChanges( + defaultBuilder().build(), + List.of( + TableChange.add( + Column.computed( + "comp", + new ResolvedExpressionMock( + DataTypes.INT(), () -> "1")), + ColumnPosition.first()))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("comp", "a", "b"); + } + + @Test + void shouldApplyAddVirtualMetadataColumn() { + // old schema: [a INT, b STRING] + final CatalogMaterializedTable newTable = + applyChanges( + defaultBuilder().build(), + List.of( + TableChange.add( + Column.metadata("m", DataTypes.STRING(), null, true)))); + + assertThat(newTable.getUnresolvedSchema().getColumns()) + .extracting(Schema.UnresolvedColumn::getName) + .containsExactly("a", "b", "m"); + } + + @Test + void shouldRejectUnsupportedTableChange() { + // old schema: [a INT, b STRING] + final TableChange unsupported = new TableChange() {}; + + assertThatThrownBy(() -> applyChanges(defaultBuilder().build(), List.of(unsupported))) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("Unsupported table change"); + } + + private static CatalogMaterializedTable applyChanges( + CatalogMaterializedTable oldTable, List<TableChange> changes) { + final MaterializedTableChangeHandler handler = + MaterializedTableChangeHandler.getHandlerWithChanges(oldTable, changes); + return MaterializedTableChangeHandler.buildNewMaterializedTable(handler); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java index e8f8a41b44d..1e889fdbb55 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java @@ -24,7 +24,6 @@ import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTab import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropSchema; import org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropWatermark; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; @@ -170,14 +169,6 @@ public abstract class SqlAlterMaterializedTableDropSchemaConverter< OperationConverterUtils.validateAndGatherDropColumn( oldTable, columnsToDrop, EX_MSG_PREFIX); validateColumnsUsedInQuery(oldTable, alterTableSchema, context); - for (Column column : oldTable.getResolvedSchema().getColumns()) { - if (column.isPersisted() && columnsToDrop.contains(column.getName())) { - throw new ValidationException( - String.format( - "%sThe column `%s` is a persisted column. Dropping of persisted columns is not supported.", - EX_MSG_PREFIX, column.getName())); - } - } return tableChanges; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 1b458494542..caaf0578db2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -403,7 +403,7 @@ class SqlMaterializedTableNodeToOperationConverterTest () -> { AlterMaterializedTableChangeOperation operation = (AlterMaterializedTableChangeOperation) parse(spec.sql); - operation.getNewTable(); + operation.validateChanges(); }) .as(spec.sql) .isInstanceOf(spec.expectedException) @@ -714,10 +714,7 @@ class SqlMaterializedTableNodeToOperationConverterTest return List.of( TestSpec.of( "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM t3", - "Failed to modify query because drop column is unsupported. When modifying " - + "a query, you can only append new columns at the end of original " - + "schema. The original schema has 4 columns, but the newly derived " - + "schema from the query has 2 columns."), + "Dropping of persisted column `c` is not supported."), TestSpec.of( "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c FROM t3", "When modifying the query of a materialized table, currently only support " @@ -1045,8 +1042,7 @@ class SqlMaterializedTableNodeToOperationConverterTest + "Column(s) ('d') are used in query."), TestSpec.of( "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP m_p", - "Failed to execute ALTER MATERIALIZED TABLE statement.\n" - + "The column `m_p` is a persisted column. Dropping of persisted columns is not supported.")); + "Dropping of persisted column `m_p` is not supported.")); } private static Collection<TestSpec> alterSet() {
