This is an automated email from the ASF dual-hosted git repository.

AHeise pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 131d7c69018ed144f011c855dd2075a3f355f05b
Author: Arvid Heise <[email protected]>
AuthorDate: Mon May 18 21:12:17 2026 +0200

    [FLINK-39700][table-api-java] Consolidate query-change and drop validation 
into AlterMaterializedTableChangeOperation
    
    Validation that was scattered across MaterializedTableChangeHandler 
(checkForChangedPositionByQuery,
    droppedPersistedCnt, validationErrors) and 
SqlAlterMaterializedTableDropSchemaConverter is consolidated
    into AlterMaterializedTableChangeOperation.validateChanges(). The handler 
becomes a pure applier: it
    throws immediately on unknown changes and applies DropColumn 
unconditionally.
    
    Same errors are thrown, just from the right layer. Tests for both layers
    (AlterMaterializedTableChangeOperationValidationTest, 
AlterMaterializedTableAsQueryOperationValidationTest,
    MaterializedTableChangeHandlerTest) are added alongside the prod change.
---
 .../AlterMaterializedTableChangeOperation.java     | 122 ++++++++++++-
 .../MaterializedTableChangeHandler.java            | 150 +---------------
 ...ializedTableAsQueryOperationValidationTest.java | 199 +++++++++++++++++++++
 ...rializedTableChangeOperationValidationTest.java | 189 +++++++++++++++++++
 .../MaterializedTableChangeHandlerTest.java        | 195 ++++++++++++++++++++
 ...lAlterMaterializedTableDropSchemaConverter.java |   9 -
 ...erializedTableNodeToOperationConverterTest.java |  10 +-
 7 files changed, 716 insertions(+), 158 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index 13a3019a5c7..2961dc88a10 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -18,22 +18,34 @@
 
 package org.apache.flink.table.operations.materializedtable;
 
+import org.apache.flink.annotation.Confluent;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.After;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.catalog.TableChange.DropColumn;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
 import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
+import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
 import org.apache.flink.table.catalog.TableChange.ModifyStartMode;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * Alter materialized table with new table definition and table changes 
represents the modification.
@@ -42,7 +54,7 @@ import java.util.stream.Collectors;
 public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTableOperation {
 
     private final Function<ResolvedCatalogMaterializedTable, 
List<TableChange>> tableChangeForTable;
-    private final ResolvedCatalogMaterializedTable oldTable;
+    private ResolvedCatalogMaterializedTable oldTable;
     private MaterializedTableChangeHandler handler;
     private CatalogMaterializedTable newTable;
     private List<TableChange> tableChanges;
@@ -77,6 +89,17 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
         return newTable;
     }
 
+    @Confluent
+    public ResolvedCatalogMaterializedTable getOldTable() {
+        return oldTable;
+    }
+
+    @Confluent
+    public void setOldTable(final ResolvedCatalogMaterializedTable oldTable) {
+        this.oldTable = oldTable;
+        this.newTable = null;
+    }
+
     protected MaterializedTableChangeHandler getHandlerWithChanges() {
         if (handler == null) {
             handler =
@@ -86,8 +109,105 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
         return handler;
     }
 
+    @VisibleForTesting
+    public void validateChanges() {
+        final List<TableChange> changes = getTableChanges();
+        final boolean isQueryChange =
+                
changes.stream().anyMatch(ModifyDefinitionQuery.class::isInstance);
+        final List<Column> oldColumns = 
oldTable.getResolvedSchema().getColumns();
+        final Map<String, Integer> columnIndex =
+                IntStream.range(0, oldColumns.size())
+                        .boxed()
+                        .collect(
+                                Collectors.toMap(
+                                        i -> oldColumns.get(i).getName(), 
Function.identity()));
+        final List<String> errors = new ArrayList<>();
+        for (final TableChange change : changes) {
+            if (change instanceof DropColumn) {
+                checkDroppedColumn((DropColumn) change, oldColumns, 
columnIndex, errors);
+            } else if (isQueryChange && change instanceof 
ModifyColumnPosition) {
+                checkPositionChange((ModifyColumnPosition) change, oldColumns, 
columnIndex, errors);
+            } else if (isQueryChange && change instanceof 
ModifyPhysicalColumnType) {
+                checkPhysicalTypeChange(
+                        (ModifyPhysicalColumnType) change, oldColumns, 
columnIndex, errors);
+            }
+        }
+        if (!errors.isEmpty()) {
+            throw new ValidationException(String.join("\n", errors));
+        }
+    }
+
+    private void checkDroppedColumn(
+            DropColumn change,
+            List<Column> oldColumns,
+            Map<String, Integer> columnIndex,
+            List<String> errors) {
+        final Integer idx = columnIndex.get(change.getColumnName());
+        if (idx != null && oldColumns.get(idx).isPersisted()) {
+            errors.add(
+                    String.format(
+                            "Dropping of persisted column `%s` is not 
supported.",
+                            change.getColumnName()));
+        }
+    }
+
+    private void checkPositionChange(
+            ModifyColumnPosition change,
+            List<Column> oldColumns,
+            Map<String, Integer> columnIndex,
+            List<String> errors) {
+        final int index = displacedColumnIndex(change.getNewPosition(), 
columnIndex);
+        if (index < 0) {
+            return;
+        }
+        errors.add(
+                positionChangeError(
+                        oldColumns.get(index).asSummaryString(),
+                        change.getNewColumn().asSummaryString(),
+                        index));
+    }
+
+    private void checkPhysicalTypeChange(
+            ModifyPhysicalColumnType change,
+            List<Column> oldColumns,
+            Map<String, Integer> columnIndex,
+            List<String> errors) {
+        final Integer idx = columnIndex.get(change.getOldColumn().getName());
+        if (idx != null) {
+            errors.add(
+                    positionChangeError(
+                            change.getOldColumn().asSummaryString(),
+                            change.getNewColumn().asSummaryString(),
+                            idx));
+        }
+    }
+
+    /**
+     * Returns the index in oldColumns of the column displaced by this 
position change, or -1 if
+     * positioned after a column absent from the old schema (i.e., appended 
after a new column).
+     */
+    private static int displacedColumnIndex(
+            ColumnPosition position, Map<String, Integer> columnIndex) {
+        if (position == ColumnPosition.first()) {
+            return 0;
+        }
+        final Integer afterIdx = columnIndex.get(((After) position).column());
+        // afterIdx == null  → after a new column not in the old schema → 
treat as append
+        // afterIdx == last  → after the last old column → also treat as 
append (no displacement)
+        return afterIdx != null && afterIdx < columnIndex.size() - 1 ? 
afterIdx + 1 : -1;
+    }
+
+    private static String positionChangeError(String oldColumn, String 
newColumn, int position) {
+        return String.format(
+                "When modifying the query of a materialized table, "
+                        + "currently only support appending columns at the end 
of original schema, dropping, renaming, and reordering columns are not 
supported.\n"
+                        + "Column mismatch at position %d: Original column is 
[%s], but new column is [%s].",
+                position + 1, oldColumn, newColumn);
+    }
+
     @Override
     public TableResultInternal execute(Context ctx) {
+        validateChanges();
         ctx.getCatalogManager()
                 .alterTable(getNewTable(), getTableChanges(), 
getTableIdentifier(), false);
         return TableResultImpl.TABLE_RESULT_OK;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
index 54a55414a8e..7521c94a791 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -58,7 +58,6 @@ import org.apache.flink.table.types.DataType;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
@@ -66,16 +65,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
-/** Applying table changes to old materialized table and gathering validation 
errors. */
+/** Applies table changes to a materialized table and builds the resulting 
definition. */
 @Internal
 public class MaterializedTableChangeHandler {
     private static final HandlerRegistry HANDLER_REGISTRY = 
createHandlerRegistry();
 
     private final List<UnresolvedColumn> columns;
     private final CatalogMaterializedTable oldTable;
-    private boolean isQueryChange;
     private @Nullable TableDistribution distribution;
     private CatalogMaterializedTable.RefreshStatus refreshStatus;
     private @Nullable String refreshHandlerDesc;
@@ -83,12 +80,10 @@ public class MaterializedTableChangeHandler {
     private List<UnresolvedWatermarkSpec> watermarkSpecs;
     private String primaryKeyName = null;
     private List<String> primaryKeyColumns = null;
-    private int droppedPersistedCnt = 0;
     private String originalQuery;
     private String expandedQuery;
     private StartMode startMode;
     private final Map<String, String> options;
-    private final List<String> validationErrors = new ArrayList<>();
 
     public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) {
         this.distribution = oldTable.getDistribution().orElse(null);
@@ -122,10 +117,10 @@ public class MaterializedTableChangeHandler {
         private void apply(MaterializedTableChangeHandler context, TableChange 
change) {
             HandlerRegistry.HandlerWrapper<?> wrapper = 
HANDLERS.get(change.getClass());
             if (wrapper == null) {
-                context.validationErrors.add("Unknown table change " + 
change.getClass());
-            } else {
-                wrapper.accept(context, change);
+                throw new ValidationException(
+                        "Unsupported table change: " + 
change.getClass().getName());
             }
+            wrapper.accept(context, change);
         }
 
         private static final class HandlerWrapper<T extends TableChange> {
@@ -141,24 +136,15 @@ public class MaterializedTableChangeHandler {
         }
     }
 
-    public List<String> getValidationErrors() {
-        return List.copyOf(validationErrors);
-    }
-
     public static MaterializedTableChangeHandler getHandlerWithChanges(
             CatalogMaterializedTable oldTable, List<TableChange> tableChanges) 
{
-        MaterializedTableChangeHandler handler = new 
MaterializedTableChangeHandler(oldTable);
+        final MaterializedTableChangeHandler handler = new 
MaterializedTableChangeHandler(oldTable);
         handler.applyTableChanges(tableChanges);
         return handler;
     }
 
     public static CatalogMaterializedTable buildNewMaterializedTable(
             MaterializedTableChangeHandler context) {
-        final List<String> validationErrors = context.getValidationErrors();
-        if (!validationErrors.isEmpty()) {
-            throw new ValidationException(String.join("\n", validationErrors));
-        }
-
         final CatalogMaterializedTable oldTable = context.getOldTable();
         return CatalogMaterializedTable.newBuilder()
                 .schema(context.retrieveSchema())
@@ -217,6 +203,9 @@ public class MaterializedTableChangeHandler {
         registry.register(
                 ModifyRefreshStatus.class, 
MaterializedTableChangeHandler::modifyRefreshStatus);
 
+        // Start mode
+        registry.register(ModifyStartMode.class, 
MaterializedTableChangeHandler::modifyStartMode);
+
         // Distribution operations
         registry.register(AddDistribution.class, 
MaterializedTableChangeHandler::addDistribution);
         registry.register(
@@ -233,25 +222,9 @@ public class MaterializedTableChangeHandler {
     }
 
     void applyTableChanges(List<TableChange> tableChanges) {
-        isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof 
ModifyDefinitionQuery);
-        Schema oldSchema = oldTable.getUnresolvedSchema();
-        if (isQueryChange) {
-            checkForChangedPositionByQuery(tableChanges, oldSchema);
-        }
-
         for (TableChange tableChange : tableChanges) {
             HANDLER_REGISTRY.apply(this, tableChange);
         }
-
-        if (droppedPersistedCnt > 0 && isQueryChange) {
-            final int schemaSize = oldSchema.getColumns().size();
-            validationErrors.add(
-                    String.format(
-                            "Failed to modify query because drop column is 
unsupported. "
-                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
-                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
-                            schemaSize, schemaSize - droppedPersistedCnt));
-        }
     }
 
     public Schema retrieveSchema() {
@@ -324,15 +297,7 @@ public class MaterializedTableChangeHandler {
     }
 
     private void dropColumn(TableChange.DropColumn dropColumn) {
-        String droppedColumnName = dropColumn.getColumnName();
-        int index = getColumnIndex(droppedColumnName);
-        UnresolvedColumn column = columns.get(index);
-        if (isQueryChange && isNonPersistedColumn(column)) {
-            // noop
-        } else {
-            columns.remove(index);
-            droppedPersistedCnt++;
-        }
+        columns.remove(getColumnIndex(dropColumn.getColumnName()));
     }
 
     private void modifyPhysicalColumnType(ModifyPhysicalColumnType 
modifyPhysicalColumnType) {
@@ -350,15 +315,6 @@ public class MaterializedTableChangeHandler {
     private void modifyColumnPosition(ModifyColumnPosition 
columnWithChangedPosition) {
         Column column = columnWithChangedPosition.getOldColumn();
         int oldPosition = getColumnIndex(column.getName());
-        if (isQueryChange) {
-            validationErrors.add(
-                    String.format(
-                            "When modifying the query of a materialized table, 
"
-                                    + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                    + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                            oldPosition + 1, column, column));
-        }
-
         ColumnPosition position = columnWithChangedPosition.getNewPosition();
         UnresolvedColumn changedPositionColumn = columns.get(oldPosition);
         columns.remove(oldPosition);
@@ -370,12 +326,6 @@ public class MaterializedTableChangeHandler {
         originalQuery = queryChange.getOriginalQuery();
     }
 
-    private boolean isNonPersistedColumn(UnresolvedColumn column) {
-        return column instanceof Schema.UnresolvedComputedColumn
-                || column instanceof Schema.UnresolvedMetadataColumn
-                        && ((Schema.UnresolvedMetadataColumn) 
column).isVirtual();
-    }
-
     private void addUniqueConstraint(AddUniqueConstraint addUniqueConstraint) {
         final UniqueConstraint constraint = 
addUniqueConstraint.getConstraint();
         primaryKeyName = constraint.getName();
@@ -461,88 +411,6 @@ public class MaterializedTableChangeHandler {
         }
     }
 
-    private void checkForChangedPositionByQuery(List<TableChange> 
tableChanges, Schema oldSchema) {
-        List<ModifyColumnPosition> positionChanges =
-                tableChanges.stream()
-                        .filter(t -> t instanceof ModifyColumnPosition)
-                        .map(t -> (ModifyColumnPosition) t)
-                        .collect(Collectors.toList());
-
-        List<ModifyPhysicalColumnType> physicalTypeChanges =
-                tableChanges.stream()
-                        .filter(t -> t instanceof ModifyPhysicalColumnType)
-                        .map(t -> (ModifyPhysicalColumnType) t)
-                        .collect(Collectors.toList());
-
-        if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) {
-            return;
-        }
-
-        int persistedColumnOffset = 0;
-        List<UnresolvedColumn> oldColumns = oldSchema.getColumns();
-        for (UnresolvedColumn column : oldColumns) {
-            if (!isNonPersistedColumn(column)) {
-                persistedColumnOffset++;
-            }
-        }
-
-        Map<String, Column> afterToColumnName = new HashMap<>();
-        for (ModifyColumnPosition change : positionChanges) {
-            final ColumnPosition position = change.getNewPosition();
-            final Column newColumn = change.getNewColumn();
-            if (position == ColumnPosition.first()) {
-                if (persistedColumnOffset == 0) {
-                    positionChangeError(
-                            newColumn.asSummaryString(),
-                            oldColumns.get(persistedColumnOffset).toString(),
-                            persistedColumnOffset);
-                } else {
-                    afterToColumnName.put(
-                            oldColumns.get(persistedColumnOffset).getName(), 
newColumn);
-                }
-            } else {
-                afterToColumnName.put(((After) position).column(), newColumn);
-            }
-        }
-
-        for (int i = 0; i < oldColumns.size() - 1; i++) {
-            Column newColumn = 
afterToColumnName.get(oldColumns.get(i).getName());
-            if (newColumn != null) {
-                positionChangeError(oldColumns.get(i + 1), newColumn, i + 1);
-            }
-        }
-
-        Map<String, Integer> nameToIndex = new HashMap<>();
-        for (int i = 0; i < oldColumns.size(); i++) {
-            UnresolvedColumn column = oldColumns.get(i);
-            if (!isNonPersistedColumn(column)) {
-                nameToIndex.put(column.getName(), i);
-            }
-        }
-
-        for (ModifyPhysicalColumnType change : physicalTypeChanges) {
-            final int index = nameToIndex.get(change.getOldColumn().getName());
-            positionChangeError(change.getOldColumn(), change.getNewColumn(), 
index);
-        }
-    }
-
-    private void positionChangeError(UnresolvedColumn oldColumn, Column 
newColumn, int position) {
-        positionChangeError(oldColumn.toString(), newColumn.asSummaryString(), 
position);
-    }
-
-    private void positionChangeError(Column oldColumn, Column newColumn, int 
position) {
-        positionChangeError(oldColumn.asSummaryString(), 
newColumn.asSummaryString(), position);
-    }
-
-    private void positionChangeError(String oldColumn, String newColumn, int 
position) {
-        validationErrors.add(
-                String.format(
-                        "When modifying the query of a materialized table, "
-                                + "currently only support appending columns at 
the end of original schema, dropping, renaming, and reordering columns are not 
supported.\n"
-                                + "Column mismatch at position %d: Original 
column is [%s], but new column is [%s].",
-                        position + 1, oldColumn, newColumn));
-    }
-
     private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition 
position) {
         if (position == null) {
             columns.add(column);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java
new file mode 100644
index 00000000000..8a513278bf7
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperationValidationTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.flink.table.catalog.Column.computed;
+import static org.apache.flink.table.catalog.Column.physical;
+import static 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperationValidationTest.resolvedTable;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for validation of {@link AlterMaterializedTableAsQueryOperation} 
(ALTER MATERIALIZED TABLE
+ * ... AS &lt;query&gt;). CREATE OR ALTER goes through {@link 
FullAlterMaterializedTableOperation}
+ * and is exercised separately by the planner integration tests.
+ */
+class AlterMaterializedTableAsQueryOperationValidationTest {
+
+    private static final TableChange QUERY_CHANGE =
+            TableChange.modifyDefinitionQuery(
+                    "SELECT a, b FROM src", "SELECT `src`.`a`, `src`.`b` FROM 
`src`");
+
+    @Test
+    void rejectDropPersistedColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT b FROM src", "SELECT `src`.`b` 
FROM `src`"),
+                                TableChange.dropColumn("a")));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Dropping of persisted column `a` is not 
supported.");
+    }
+
+    @Test
+    void acceptDropNonPersistedColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()),
+                                computed(
+                                        "comp",
+                                        new ResolvedExpressionMock(
+                                                DataTypes.STRING(), () -> 
"UPPER(a)"))));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT a FROM src2", "SELECT 
`src2`.`a` FROM `src2`"),
+                                TableChange.dropColumn("comp")));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void rejectReorderColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()),
+                                physical("b", DataTypes.STRING()),
+                                physical("c", DataTypes.BIGINT())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT a, c, b FROM src",
+                                        "SELECT `src`.`a`, `src`.`c`, 
`src`.`b` FROM `src`"),
+                                TableChange.modifyColumnPosition(
+                                        physical("c", DataTypes.BIGINT()),
+                                        ColumnPosition.after("a"))));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column mismatch at position 2: Original column is 
[`b` STRING], but new column is [`c` BIGINT].");
+    }
+
+    @Test
+    void rejectReorderColumnToFirst() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT b, a FROM src",
+                                        "SELECT `src`.`b`, `src`.`a` FROM 
`src`"),
+                                TableChange.modifyColumnPosition(
+                                        physical("b", DataTypes.STRING()),
+                                        ColumnPosition.first())));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column mismatch at position 1: Original column is 
[`a` INT], but new column is [`b` STRING].");
+    }
+
+    @Test
+    void rejectTypeChange() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT CAST(a AS BIGINT), b FROM src",
+                                        "SELECT CAST(`src`.`a` AS BIGINT), 
`src`.`b` FROM `src`"),
+                                TableChange.modifyPhysicalColumnType(
+                                        physical("a", DataTypes.INT()), 
DataTypes.BIGINT())));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Column mismatch at position 1: Original column is 
[`a` INT], but new column is [`a` BIGINT].");
+    }
+
+    @Test
+    void acceptAppendColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(
+                        oldTable,
+                        List.of(QUERY_CHANGE, TableChange.add(physical("c", 
DataTypes.BIGINT()))));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void acceptQueryOnlyChange() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableAsQueryOperation op =
+                operation(oldTable, List.of(QUERY_CHANGE));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    private static AlterMaterializedTableAsQueryOperation operation(
+            ResolvedCatalogMaterializedTable oldTable, List<TableChange> 
changes) {
+        return new AlterMaterializedTableAsQueryOperation(
+                ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, 
oldTable);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java
new file mode 100644
index 00000000000..5497e19fb7f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperationValidationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.catalog.Column.computed;
+import static org.apache.flink.table.catalog.Column.metadata;
+import static org.apache.flink.table.catalog.Column.physical;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for {@link AlterMaterializedTableChangeOperation#validateChanges()} 
when the operation
+ * carries explicit DDL changes (no query change). Query-change validation is 
exercised by {@link
+ * AlterMaterializedTableAsQueryOperationValidationTest}.
+ */
+class AlterMaterializedTableChangeOperationValidationTest {
+
+    @Test
+    void rejectDropPersistedColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, List.of(TableChange.dropColumn("a")));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Dropping of persisted column `a` is not 
supported.");
+    }
+
+    @Test
+    void rejectDropPersistedMetadataColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()),
+                                metadata("m", DataTypes.STRING(), null, 
false)));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, List.of(TableChange.dropColumn("m")));
+
+        assertThatThrownBy(op::validateChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Dropping of persisted column `m` is not 
supported.");
+    }
+
+    @Test
+    void acceptDropComputedColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()),
+                                computed(
+                                        "c",
+                                        new ResolvedExpressionMock(
+                                                DataTypes.STRING(), () -> 
"UPPER(a)"))));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, List.of(TableChange.dropColumn("c")));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void acceptDropVirtualMetadataColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()),
+                                metadata("v", DataTypes.STRING(), null, 
true)));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, List.of(TableChange.dropColumn("v")));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void acceptDropNonExistentColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, 
List.of(TableChange.dropColumn("does-not-exist")));
+
+        // No column to validate against; planner-level checks handle this 
elsewhere.
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void acceptAddColumn() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(oldTable, List.of(TableChange.add(physical("c", 
DataTypes.BIGINT()))));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    @Test
+    void acceptCommentChange() {
+        final ResolvedCatalogMaterializedTable oldTable =
+                resolvedTable(
+                        ResolvedSchema.of(
+                                physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())));
+
+        final AlterMaterializedTableChangeOperation op =
+                operation(
+                        oldTable,
+                        List.of(
+                                TableChange.modifyColumnComment(
+                                        physical("a", DataTypes.INT()), "new 
comment")));
+
+        assertThatNoException().isThrownBy(op::validateChanges);
+    }
+
+    static ResolvedCatalogMaterializedTable resolvedTable(ResolvedSchema 
resolvedSchema) {
+        final CatalogMaterializedTable origin =
+                CatalogMaterializedTable.newBuilder()
+                        
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
+                        .comment("")
+                        .partitionKeys(List.of())
+                        .options(Map.of())
+                        .originalQuery("SELECT a FROM src")
+                        .expandedQuery("SELECT `src`.`a` FROM `src`")
+                        .freshness(IntervalFreshness.ofSecond(60))
+                        .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
+                        .refreshMode(RefreshMode.CONTINUOUS)
+                        .refreshStatus(RefreshStatus.INITIALIZING)
+                        .startMode(StartMode.of(StartModeKind.FROM_BEGINNING))
+                        .build();
+        return new ResolvedCatalogMaterializedTable(
+                origin,
+                resolvedSchema,
+                RefreshMode.CONTINUOUS,
+                IntervalFreshness.ofSecond(60),
+                StartMode.of(StartModeKind.FROM_BEGINNING));
+    }
+
+    static AlterMaterializedTableChangeOperation operation(
+            ResolvedCatalogMaterializedTable oldTable, List<TableChange> 
changes) {
+        return new AlterMaterializedTableChangeOperation(
+                ObjectIdentifier.of("cat", "db", "mt"), ignored -> changes, 
oldTable);
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java
new file mode 100644
index 00000000000..3c5e4bdbf09
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandlerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.StartMode;
+import org.apache.flink.table.catalog.StartMode.StartModeKind;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.InstanceOfAssertFactories.type;
+
+/**
+ * Tests for {@link MaterializedTableChangeHandler} — pure applier behavior 
exercised without the
+ * planner stack. Validation lives in {@link
+ * AlterMaterializedTableChangeOperation#validateChanges()}; tests there 
assert what is rejected.
+ */
+class MaterializedTableChangeHandlerTest {
+
+    /** Default schema used by tests that don't refine it: [a INT, b STRING]. 
*/
+    private static final Schema DEFAULT_SCHEMA =
+            Schema.newBuilder()
+                    .column("a", DataTypes.INT())
+                    .column("b", DataTypes.STRING())
+                    .build();
+
+    private static CatalogMaterializedTable.Builder defaultBuilder() {
+        return CatalogMaterializedTable.newBuilder()
+                .schema(DEFAULT_SCHEMA)
+                .comment("")
+                .partitionKeys(List.of())
+                .options(Map.of())
+                .originalQuery("dummy")
+                .expandedQuery("dummy")
+                .freshness(IntervalFreshness.ofSecond(60))
+                .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
+                .refreshMode(RefreshMode.CONTINUOUS)
+                .refreshStatus(RefreshStatus.INITIALIZING)
+                .startMode(StartMode.of(StartModeKind.FROM_BEGINNING));
+    }
+
+    @Test
+    void shouldDropPersistedColumn() {
+        // old schema: [a INT, b STRING]
+        final CatalogMaterializedTable newTable =
+                applyChanges(defaultBuilder().build(), 
List.of(TableChange.dropColumn("a")));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("b");
+    }
+
+    @Test
+    void shouldDropComputedColumn() {
+        // old schema: [a INT, comp AS UPPER(a)]
+        final CatalogMaterializedTable tableWithComputed =
+                defaultBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT())
+                                        .columnByExpression("comp", "UPPER(a)")
+                                        .build())
+                        .build();
+
+        final CatalogMaterializedTable newTable =
+                applyChanges(tableWithComputed, 
List.of(TableChange.dropColumn("comp")));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("a");
+    }
+
+    @Test
+    void shouldDropVirtualMetadataColumn() {
+        // old schema: [a INT, v STRING METADATA VIRTUAL]
+        final CatalogMaterializedTable tableWithMetadata =
+                defaultBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT())
+                                        .columnByMetadata("v", 
DataTypes.STRING(), null, true)
+                                        .build())
+                        .build();
+
+        final CatalogMaterializedTable newTable =
+                applyChanges(tableWithMetadata, 
List.of(TableChange.dropColumn("v")));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("a");
+    }
+
+    @Test
+    void shouldApplyAddColumnAfter() {
+        // old schema: [a INT, b STRING]
+        final CatalogMaterializedTable newTable =
+                applyChanges(
+                        defaultBuilder().build(),
+                        List.of(
+                                TableChange.add(
+                                        Column.physical("x", 
DataTypes.BIGINT()),
+                                        ColumnPosition.after("a"))));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("a", "x", "b");
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .element(1)
+                .asInstanceOf(type(Schema.UnresolvedPhysicalColumn.class))
+                .returns(DataTypes.BIGINT(), 
Schema.UnresolvedPhysicalColumn::getDataType);
+    }
+
+    @Test
+    void shouldApplyAddComputedColumnFirst() {
+        // old schema: [a INT, b STRING]
+        final CatalogMaterializedTable newTable =
+                applyChanges(
+                        defaultBuilder().build(),
+                        List.of(
+                                TableChange.add(
+                                        Column.computed(
+                                                "comp",
+                                                new ResolvedExpressionMock(
+                                                        DataTypes.INT(), () -> 
"1")),
+                                        ColumnPosition.first())));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("comp", "a", "b");
+    }
+
+    @Test
+    void shouldApplyAddVirtualMetadataColumn() {
+        // old schema: [a INT, b STRING]
+        final CatalogMaterializedTable newTable =
+                applyChanges(
+                        defaultBuilder().build(),
+                        List.of(
+                                TableChange.add(
+                                        Column.metadata("m", 
DataTypes.STRING(), null, true))));
+
+        assertThat(newTable.getUnresolvedSchema().getColumns())
+                .extracting(Schema.UnresolvedColumn::getName)
+                .containsExactly("a", "b", "m");
+    }
+
+    @Test
+    void shouldRejectUnsupportedTableChange() {
+        // old schema: [a INT, b STRING]
+        final TableChange unsupported = new TableChange() {};
+
+        assertThatThrownBy(() -> applyChanges(defaultBuilder().build(), 
List.of(unsupported)))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Unsupported table change");
+    }
+
+    private static CatalogMaterializedTable applyChanges(
+            CatalogMaterializedTable oldTable, List<TableChange> changes) {
+        final MaterializedTableChangeHandler handler =
+                MaterializedTableChangeHandler.getHandlerWithChanges(oldTable, 
changes);
+        return 
MaterializedTableChangeHandler.buildNewMaterializedTable(handler);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
index e8f8a41b44d..1e889fdbb55 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTab
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropSchema;
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableDropWatermark;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
@@ -170,14 +169,6 @@ public abstract class 
SqlAlterMaterializedTableDropSchemaConverter<
                     OperationConverterUtils.validateAndGatherDropColumn(
                             oldTable, columnsToDrop, EX_MSG_PREFIX);
             validateColumnsUsedInQuery(oldTable, alterTableSchema, context);
-            for (Column column : oldTable.getResolvedSchema().getColumns()) {
-                if (column.isPersisted() && 
columnsToDrop.contains(column.getName())) {
-                    throw new ValidationException(
-                            String.format(
-                                    "%sThe column `%s` is a persisted column. 
Dropping of persisted columns is not supported.",
-                                    EX_MSG_PREFIX, column.getName()));
-                }
-            }
             return tableChanges;
         }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 1b458494542..caaf0578db2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -403,7 +403,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         () -> {
                             AlterMaterializedTableChangeOperation operation =
                                     (AlterMaterializedTableChangeOperation) 
parse(spec.sql);
-                            operation.getNewTable();
+                            operation.validateChanges();
                         })
                 .as(spec.sql)
                 .isInstanceOf(spec.expectedException)
@@ -714,10 +714,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
         return List.of(
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b 
FROM t3",
-                        "Failed to modify query because drop column is 
unsupported. When modifying "
-                                + "a query, you can only append new columns at 
the end of original "
-                                + "schema. The original schema has 4 columns, 
but the newly derived "
-                                + "schema from the query has 2 columns."),
+                        "Dropping of persisted column `c` is not supported."),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, 
c FROM t3",
                         "When modifying the query of a materialized table, 
currently only support "
@@ -1045,8 +1042,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "Column(s) ('d') are used in query."),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
m_p",
-                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
-                                + "The column `m_p` is a persisted column. 
Dropping of persisted columns is not supported."));
+                        "Dropping of persisted column `m_p` is not 
supported."));
     }
 
     private static Collection<TestSpec> alterSet() {

Reply via email to