This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7204c8fd7c8 [FLINK-39077][table] Throw for materialized table change 
query operation in case of non persisted columns in schema
7204c8fd7c8 is described below

commit 7204c8fd7c8cb428c2d4cf45f616e5a4d9cf6295
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 5 22:19:29 2026 +0100

    [FLINK-39077][table] Throw for materialized table change query operation in 
case of non persisted columns in schema
---
 .../MaterializedTableManager.java                  |  48 +-
 .../AlterMaterializedTableAsQueryOperation.java    |  12 +-
 .../AlterMaterializedTableChangeOperation.java     | 535 ++------------------
 .../FullAlterMaterializedTableOperation.java       |  47 ++
 .../MaterializedTableChangeHandler.java            | 552 +++++++++++++++++++++
 .../AbstractAlterMaterializedTableConverter.java   |  33 +-
 ...rMaterializedTableAddDistributionConverter.java |  15 +-
 .../SqlAlterMaterializedTableAsQueryConverter.java |  58 ++-
 ...MaterializedTableDropDistributionConverter.java |   9 +-
 ...lAlterMaterializedTableDropSchemaConverter.java |  17 +-
 ...terializedTableModifyDistributionConverter.java |  15 +-
 .../SqlAlterMaterializedTableSchemaConverter.java  |  35 +-
 ...SqlCreateOrAlterMaterializedTableConverter.java |  67 +--
 .../planner/utils/MaterializedTableUtils.java      |  51 +-
 .../operations/SqlDdlToOperationConverterTest.java |  10 +-
 ...erializedTableNodeToOperationConverterTest.java |  71 ++-
 .../planner/utils/MaterializedTableUtilsTest.java  |  20 -
 17 files changed, 915 insertions(+), 680 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index efafb8a3a8c..7279639ce3d 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -325,11 +325,11 @@ public class MaterializedTableManager {
         return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
     }
 
-    private CatalogMaterializedTable suspendContinuousRefreshJob(
+    private ResolvedCatalogMaterializedTable suspendContinuousRefreshJob(
             OperationExecutor operationExecutor,
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
-            CatalogMaterializedTable materializedTable) {
+            ResolvedCatalogMaterializedTable materializedTable) {
         try {
             ContinuousRefreshHandler refreshHandler =
                     
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
@@ -371,7 +371,7 @@ public class MaterializedTableManager {
             OperationExecutor operationExecutor,
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
-            CatalogMaterializedTable materializedTable) {
+            ResolvedCatalogMaterializedTable materializedTable) {
         if (RefreshStatus.SUSPENDED == materializedTable.getRefreshStatus()) {
             throw new SqlExecutionException(
                     String.format(
@@ -489,7 +489,7 @@ public class MaterializedTableManager {
             OperationExecutor operationExecutor,
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
-            CatalogMaterializedTable catalogMaterializedTable,
+            ResolvedCatalogMaterializedTable catalogMaterializedTable,
             Map<String, String> dynamicOptions) {
         // Repeated resume refresh workflow is not supported
         if (RefreshStatus.ACTIVATED == 
catalogMaterializedTable.getRefreshStatus()) {
@@ -577,7 +577,7 @@ public class MaterializedTableManager {
                 operationExecutor,
                 handle,
                 materializedTableIdentifier,
-                catalogMaterializedTable,
+                resolveCatalogMaterializedTable(operationExecutor, 
catalogMaterializedTable),
                 RefreshStatus.ACTIVATED,
                 continuousRefreshHandler.asSummaryString(),
                 serializedBytes);
@@ -822,7 +822,7 @@ public class MaterializedTableManager {
 
         if (RefreshStatus.ACTIVATED == 
oldMaterializedTable.getRefreshStatus()) {
             // 1. suspend the materialized table
-            CatalogMaterializedTable suspendMaterializedTable =
+            ResolvedCatalogMaterializedTable suspendMaterializedTable =
                     suspendContinuousRefreshJob(
                             operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
 
@@ -830,7 +830,7 @@ public class MaterializedTableManager {
             AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
                     new AlterMaterializedTableChangeOperation(
                             op.getTableIdentifier(),
-                            op.getTableChanges(),
+                            oldTable -> op.getTableChanges(),
                             suspendMaterializedTable);
             operationExecutor.callExecutableOperation(
                     handle, alterMaterializedTableChangeOperation);
@@ -840,8 +840,7 @@ public class MaterializedTableManager {
                 executeContinuousRefreshJob(
                         operationExecutor,
                         handle,
-                        alterMaterializedTableChangeOperation
-                                .getMaterializedTableWithAppliedChanges(),
+                        alterMaterializedTableChangeOperation.getNewTable(),
                         tableIdentifier,
                         Collections.emptyMap(),
                         Optional.empty());
@@ -851,7 +850,7 @@ public class MaterializedTableManager {
                 LOG.warn(
                         "Failed to start the continuous refresh job for 
materialized table {} using new query {}, rollback to origin query {}.",
                         tableIdentifier,
-                        
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
+                        op.getNewTable().getExpandedQuery(),
                         suspendMaterializedTable.getExpandedQuery(),
                         e);
 
@@ -874,8 +873,7 @@ public class MaterializedTableManager {
                 throw new SqlExecutionException(
                         String.format(
                                 "Failed to start the continuous refresh job 
using new query %s when altering materialized table %s select query.",
-                                
op.getMaterializedTableWithAppliedChanges().getExpandedQuery(),
-                                tableIdentifier),
+                                op.getNewTable().getExpandedQuery(), 
tableIdentifier),
                         e);
             }
         } else if (RefreshStatus.SUSPENDED == 
oldMaterializedTable.getRefreshStatus()) {
@@ -889,7 +887,7 @@ public class MaterializedTableManager {
 
             AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
                     new AlterMaterializedTableChangeOperation(
-                            tableIdentifier, tableChanges, 
oldMaterializedTable);
+                            tableIdentifier, oldTable -> tableChanges, 
oldMaterializedTable);
 
             operationExecutor.callExecutableOperation(
                     handle, alterMaterializedTableChangeOperation);
@@ -904,11 +902,11 @@ public class MaterializedTableManager {
     }
 
     private AlterMaterializedTableChangeOperation 
generateRollbackAlterMaterializedTableOperation(
-            CatalogMaterializedTable oldMaterializedTable,
+            ResolvedCatalogMaterializedTable oldMaterializedTable,
             AlterMaterializedTableChangeOperation op) {
 
         return new AlterMaterializedTableChangeOperation(
-                op.getTableIdentifier(), List.of(), oldMaterializedTable);
+                op.getTableIdentifier(), oldTable -> List.of(), 
oldMaterializedTable);
     }
 
     private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
@@ -1158,11 +1156,20 @@ public class MaterializedTableManager {
         return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
     }
 
-    private CatalogMaterializedTable updateRefreshHandler(
+    private ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
+            OperationExecutor operationExecutor, CatalogMaterializedTable 
materializedTable) {
+        return operationExecutor
+                .getSessionContext()
+                .getSessionState()
+                .catalogManager
+                .resolveCatalogMaterializedTable(materializedTable);
+    }
+
+    private ResolvedCatalogMaterializedTable updateRefreshHandler(
             OperationExecutor operationExecutor,
             OperationHandle operationHandle,
             ObjectIdentifier materializedTableIdentifier,
-            CatalogMaterializedTable catalogMaterializedTable,
+            ResolvedCatalogMaterializedTable catalogMaterializedTable,
             RefreshStatus refreshStatus,
             String refreshHandlerSummary,
             byte[] serializedRefreshHandler) {
@@ -1172,12 +1179,15 @@ public class MaterializedTableManager {
                 TableChange.modifyRefreshHandler(refreshHandlerSummary, 
serializedRefreshHandler));
         AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
                 new AlterMaterializedTableChangeOperation(
-                        materializedTableIdentifier, tableChanges, 
catalogMaterializedTable);
+                        materializedTableIdentifier,
+                        oldTable -> tableChanges,
+                        catalogMaterializedTable);
         // update RefreshHandler to Catalog
         operationExecutor.callExecutableOperation(
                 operationHandle, alterMaterializedTableChangeOperation);
 
-        return 
alterMaterializedTableChangeOperation.getMaterializedTableWithAppliedChanges();
+        return resolveCatalogMaterializedTable(
+                operationExecutor, 
alterMaterializedTableChangeOperation.getNewTable());
     }
 
     /** Generate insert statement for materialized table. */
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
index 7ed81b58665..264fc752bfa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.operations.materializedtable;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.TableChange;
 
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * Operation to describe an ALTER MATERIALIZED TABLE AS query operation. The 
operation is not
@@ -38,9 +39,9 @@ public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTab
 
     public AlterMaterializedTableAsQueryOperation(
             ObjectIdentifier tableIdentifier,
-            List<TableChange> tableChanges,
-            CatalogMaterializedTable oldTable) {
-        super(tableIdentifier, tableChanges, oldTable);
+            Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
tableChangesForTable,
+            ResolvedCatalogMaterializedTable oldTable) {
+        super(tableIdentifier, tableChangesForTable, oldTable);
     }
 
     @Override
@@ -53,7 +54,6 @@ public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTab
     public String asSummaryString() {
         return String.format(
                 "ALTER MATERIALIZED TABLE %s AS %s",
-                tableIdentifier.asSummaryString(),
-                getMaterializedTableWithAppliedChanges().getExpandedQuery());
+                tableIdentifier.asSummaryString(), 
getNewTable().getExpandedQuery());
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index b480cc00083..0930b69f1ae 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -19,59 +19,19 @@
 package org.apache.flink.table.operations.materializedtable;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.Schema.UnresolvedColumn;
-import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
-import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
-import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
-import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.Column.ComputedColumn;
-import org.apache.flink.table.catalog.Column.MetadataColumn;
-import org.apache.flink.table.catalog.Column.PhysicalColumn;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.TableChange;
-import org.apache.flink.table.catalog.TableChange.AddColumn;
-import org.apache.flink.table.catalog.TableChange.AddDistribution;
-import org.apache.flink.table.catalog.TableChange.AddUniqueConstraint;
-import org.apache.flink.table.catalog.TableChange.AddWatermark;
-import org.apache.flink.table.catalog.TableChange.After;
-import org.apache.flink.table.catalog.TableChange.ColumnPosition;
-import org.apache.flink.table.catalog.TableChange.DropColumn;
-import org.apache.flink.table.catalog.TableChange.DropConstraint;
-import org.apache.flink.table.catalog.TableChange.DropDistribution;
-import org.apache.flink.table.catalog.TableChange.DropWatermark;
-import org.apache.flink.table.catalog.TableChange.ModifyColumn;
-import org.apache.flink.table.catalog.TableChange.ModifyColumnComment;
-import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
 import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
-import org.apache.flink.table.catalog.TableChange.ModifyDistribution;
-import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
 import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
-import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
-import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UniqueConstraint;
-import org.apache.flink.table.catalog.WatermarkSpec;
-import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
-import org.apache.flink.table.types.DataType;
 
-import javax.annotation.Nullable;
-
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -80,90 +40,70 @@ import java.util.stream.Collectors;
 @Internal
 public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTableOperation {
 
-    private final List<TableChange> tableChanges;
-    private final CatalogMaterializedTable oldTable;
-    private CatalogMaterializedTable materializedTableWithAppliedChanges;
+    private final Function<ResolvedCatalogMaterializedTable, 
List<TableChange>> tableChangeForTable;
+    private final ResolvedCatalogMaterializedTable oldTable;
+    private MaterializedTableChangeHandler handler;
+    private CatalogMaterializedTable newTable;
+    private List<TableChange> tableChanges;
 
-    /**
-     * The original order of table changes should be kept as is in some 
situations different order
-     * might lead to different results.
-     */
     public AlterMaterializedTableChangeOperation(
             ObjectIdentifier tableIdentifier,
-            List<TableChange> tableChanges,
-            CatalogMaterializedTable oldTable) {
-        this(tableIdentifier, tableChanges, oldTable, null);
-    }
-
-    private AlterMaterializedTableChangeOperation(
-            ObjectIdentifier tableIdentifier,
-            List<TableChange> tableChanges,
-            CatalogMaterializedTable oldTable,
-            CatalogMaterializedTable catalogMaterializedTable) {
+            Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
tableChangeForTable,
+            ResolvedCatalogMaterializedTable oldTable) {
         super(tableIdentifier);
-        this.tableChanges = tableChanges;
+        this.tableChangeForTable = tableChangeForTable;
         this.oldTable = oldTable;
-        this.materializedTableWithAppliedChanges = catalogMaterializedTable;
     }
 
     public List<TableChange> getTableChanges() {
+        if (tableChanges == null) {
+            tableChanges = tableChangeForTable.apply(oldTable);
+        }
         return tableChanges;
     }
 
     public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() {
         return new AlterMaterializedTableChangeOperation(
-                tableIdentifier, tableChanges, oldTable, 
materializedTableWithAppliedChanges);
+                tableIdentifier, tableChangeForTable, oldTable);
     }
 
-    public CatalogMaterializedTable getMaterializedTableWithAppliedChanges() {
-        // The only case when materializedTableWithAppliedChanges is not null 
from the beginning
-        // is copyAsTableChangeOperation where it copies already evaluated 
materialized table
-        if (oldTable == null || materializedTableWithAppliedChanges != null) {
-            return materializedTableWithAppliedChanges;
+    public CatalogMaterializedTable getNewTable() {
+        if (newTable == null) {
+            newTable =
+                    MaterializedTableChangeHandler.buildNewMaterializedTable(
+                            getHandlerWithChanges());
         }
+        return newTable;
+    }
 
-        ChangeContext changeContext = new ChangeContext(oldTable);
-        changeContext.applyTableChanges(tableChanges);
-
-        materializedTableWithAppliedChanges =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(changeContext.retrieveSchema())
-                        .comment(oldTable.getComment())
-                        .partitionKeys(oldTable.getPartitionKeys())
-                        .options(changeContext.options)
-                        .originalQuery(changeContext.originalQuery)
-                        .expandedQuery(changeContext.expandedQuery)
-                        .distribution(changeContext.distribution)
-                        .freshness(oldTable.getDefinitionFreshness())
-                        .logicalRefreshMode(oldTable.getLogicalRefreshMode())
-                        .refreshMode(oldTable.getRefreshMode())
-                        .refreshStatus(changeContext.refreshStatus)
-                        
.refreshHandlerDescription(changeContext.refreshHandlerDesc)
-                        
.serializedRefreshHandler(changeContext.refreshHandlerBytes)
-                        .build();
-
-        return materializedTableWithAppliedChanges;
+    protected MaterializedTableChangeHandler getHandlerWithChanges() {
+        if (handler == null) {
+            handler =
+                    MaterializedTableChangeHandler.getHandlerWithChanges(
+                            oldTable, getTableChanges());
+        }
+        return handler;
     }
 
     @Override
     public TableResultInternal execute(Context ctx) {
         ctx.getCatalogManager()
-                .alterTable(
-                        getMaterializedTableWithAppliedChanges(),
-                        getTableChanges(),
-                        getTableIdentifier(),
-                        false);
+                .alterTable(getNewTable(), getTableChanges(), 
getTableIdentifier(), false);
         return TableResultImpl.TABLE_RESULT_OK;
     }
 
     @Override
     public String asSummaryString() {
         String changes =
-                tableChanges.stream()
+                getTableChanges().stream()
                         .map(AlterMaterializedTableChangeOperation::toString)
                         .collect(Collectors.joining(",\n"));
         return String.format(
-                "ALTER MATERIALIZED TABLE %s\n%s", 
tableIdentifier.asSummaryString(), changes);
+                "%s %s\n%s", getOperationName(), 
tableIdentifier.asSummaryString(), changes);
+    }
+
+    protected String getOperationName() {
+        return "ALTER MATERIALIZED TABLE";
     }
 
     private static String toString(TableChange tableChange) {
@@ -184,409 +124,4 @@ public class AlterMaterializedTableChangeOperation 
extends AlterMaterializedTabl
             return AlterTableChangeOperation.toString(tableChange);
         }
     }
-
-    private static class ChangeContext {
-        private static final HandlerRegistry HANDLER_REGISTRY = 
createHandlerRegistry();
-
-        private final List<UnresolvedColumn> columns;
-        private final CatalogMaterializedTable oldTable;
-        private boolean isQueryChange;
-        private @Nullable TableDistribution distribution;
-        private RefreshStatus refreshStatus;
-        private @Nullable String refreshHandlerDesc;
-        private byte[] refreshHandlerBytes;
-        private List<UnresolvedWatermarkSpec> watermarkSpecs;
-        private String primaryKeyName = null;
-        private List<String> primaryKeyColumns = null;
-        private int droppedPersistedCnt = 0;
-        private String originalQuery;
-        private String expandedQuery;
-        private final Map<String, String> options;
-
-        public ChangeContext(CatalogMaterializedTable oldTable) {
-            this.distribution = oldTable.getDistribution().orElse(null);
-            this.refreshStatus = oldTable.getRefreshStatus();
-            this.refreshHandlerDesc = 
oldTable.getRefreshHandlerDescription().orElse(null);
-            this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
-            this.watermarkSpecs = 
oldTable.getUnresolvedSchema().getWatermarkSpecs();
-            this.columns = new 
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
-            Schema.UnresolvedPrimaryKey primaryKey =
-                    
oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
-            if (primaryKey != null) {
-                this.primaryKeyName = primaryKey.getConstraintName();
-                this.primaryKeyColumns = primaryKey.getColumnNames();
-            }
-            originalQuery = oldTable.getOriginalQuery();
-            expandedQuery = oldTable.getExpandedQuery();
-            this.oldTable = oldTable;
-            this.options = new HashMap<>(oldTable.getOptions());
-        }
-
-        private static final class HandlerRegistry {
-            private static final Map<Class<? extends TableChange>, 
HandlerWrapper<?>> HANDLERS =
-                    new IdentityHashMap<>();
-
-            private <T extends TableChange> void register(
-                    Class<T> type, BiConsumer<ChangeContext, T> handler) {
-                HANDLERS.put(type, new HandlerWrapper<>(handler));
-            }
-
-            private void apply(ChangeContext context, TableChange change) {
-                HandlerWrapper<?> wrapper = HANDLERS.get(change.getClass());
-                if (wrapper == null) {
-                    throw new ValidationException("Unknown table change " + 
change.getClass());
-                }
-                wrapper.accept(context, change);
-            }
-
-            private static final class HandlerWrapper<T extends TableChange> {
-                private final BiConsumer<ChangeContext, T> handler;
-
-                private HandlerWrapper(BiConsumer<ChangeContext, T> handler) {
-                    this.handler = handler;
-                }
-
-                private void accept(ChangeContext context, TableChange change) 
{
-                    handler.accept(context, (T) change);
-                }
-            }
-        }
-
-        private static HandlerRegistry createHandlerRegistry() {
-            HandlerRegistry registry = new HandlerRegistry();
-
-            // Column operations
-            registry.register(AddColumn.class, ChangeContext::addColumn);
-            registry.register(ModifyColumn.class, ChangeContext::modifyColumn);
-            registry.register(DropColumn.class, ChangeContext::dropColumn);
-            registry.register(
-                    ModifyPhysicalColumnType.class, 
ChangeContext::modifyPhysicalColumnType);
-            registry.register(ModifyColumnComment.class, 
ChangeContext::modifyColumnComment);
-            registry.register(ModifyColumnPosition.class, 
ChangeContext::modifyColumnPosition);
-
-            // Query operations
-            registry.register(ModifyDefinitionQuery.class, 
ChangeContext::modifyDefinitionQuery);
-
-            // Constraint operations
-            registry.register(AddUniqueConstraint.class, 
ChangeContext::addUniqueConstraint);
-            registry.register(ModifyUniqueConstraint.class, 
ChangeContext::modifyUniqueConstraint);
-            registry.register(DropConstraint.class, 
ChangeContext::dropConstraint);
-
-            // Watermark operations
-            registry.register(AddWatermark.class, ChangeContext::addWatermark);
-            registry.register(ModifyWatermark.class, 
ChangeContext::modifyWatermark);
-            registry.register(DropWatermark.class, 
ChangeContext::dropWatermark);
-
-            // Refresh operations
-            registry.register(ModifyRefreshHandler.class, 
ChangeContext::modifyRefreshHandler);
-            registry.register(ModifyRefreshStatus.class, 
ChangeContext::modifyRefreshStatus);
-
-            // Distribution operations
-            registry.register(AddDistribution.class, 
ChangeContext::addDistribution);
-            registry.register(ModifyDistribution.class, 
ChangeContext::modifyDistribution);
-            registry.register(DropDistribution.class, 
ChangeContext::dropDistribution);
-
-            // Options
-            registry.register(TableChange.SetOption.class, 
ChangeContext::setTableOption);
-            registry.register(TableChange.ResetOption.class, 
ChangeContext::resetTableOption);
-
-            return registry;
-        }
-
-        private void applyTableChanges(List<TableChange> tableChanges) {
-            isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof 
ModifyDefinitionQuery);
-            Schema oldSchema = oldTable.getUnresolvedSchema();
-            if (isQueryChange) {
-                checkForChangedPositionByQuery(tableChanges, oldSchema);
-            }
-
-            for (TableChange tableChange : tableChanges) {
-                HANDLER_REGISTRY.apply(this, tableChange);
-            }
-
-            if (droppedPersistedCnt > 0 && isQueryChange) {
-                final int schemaSize = oldSchema.getColumns().size();
-                throw new ValidationException(
-                        String.format(
-                                "Failed to modify query because drop column is 
unsupported. "
-                                        + "When modifying a query, you can 
only append new columns at the end of original schema. "
-                                        + "The original schema has %d columns, 
but the newly derived schema from the query has %d columns.",
-                                schemaSize, schemaSize - droppedPersistedCnt));
-            }
-        }
-
-        private Schema retrieveSchema() {
-            Schema.Builder schemaToApply = 
Schema.newBuilder().fromColumns(columns);
-            if (primaryKeyColumns != null) {
-                if (primaryKeyName == null) {
-                    schemaToApply.primaryKey(primaryKeyColumns);
-                } else {
-                    schemaToApply.primaryKeyNamed(primaryKeyName, 
primaryKeyColumns);
-                }
-            }
-
-            for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
-                schemaToApply.watermark(spec.getColumnName(), 
spec.getWatermarkExpression());
-            }
-            return schemaToApply.build();
-        }
-
-        private void addColumn(AddColumn addColumn) {
-            Column column = addColumn.getColumn();
-            ColumnPosition position = addColumn.getPosition();
-            UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
-            setColumnAtPosition(columnToAdd, position);
-        }
-
-        private void modifyColumn(ModifyColumn modifyColumn) {
-            Column column = modifyColumn.getOldColumn();
-            Column newColumn = modifyColumn.getNewColumn();
-            int index = getColumnIndex(column.getName());
-            UnresolvedColumn newColumn1 = toUnresolvedColumn(newColumn);
-            columns.set(index, newColumn1);
-        }
-
-        private void dropColumn(DropColumn dropColumn) {
-            String droppedColumnName = dropColumn.getColumnName();
-            int index = getColumnIndex(droppedColumnName);
-            UnresolvedColumn column = columns.get(index);
-            if (isQueryChange && isNonPersistedColumn(column)) {
-                // noop
-            } else {
-                columns.remove(index);
-                droppedPersistedCnt++;
-            }
-        }
-
-        private void modifyPhysicalColumnType(ModifyPhysicalColumnType 
modifyPhysicalColumnType) {
-            Column column = modifyPhysicalColumnType.getOldColumn();
-            int position = getColumnIndex(column.getName());
-            columns.set(position, 
toUnresolvedColumn(modifyPhysicalColumnType.getNewColumn()));
-        }
-
-        private void modifyColumnComment(ModifyColumnComment 
modifyColumnComment) {
-            Column column = modifyColumnComment.getOldColumn();
-            int position = getColumnIndex(column.getName());
-            columns.set(position, 
toUnresolvedColumn(modifyColumnComment.getNewColumn()));
-        }
-
-        private void modifyColumnPosition(ModifyColumnPosition 
columnWithChangedPosition) {
-            Column column = columnWithChangedPosition.getOldColumn();
-            int oldPosition = getColumnIndex(column.getName());
-            if (isQueryChange) {
-                throw new ValidationException(
-                        String.format(
-                                "When modifying the query of a materialized 
table, "
-                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                oldPosition, column, column));
-            }
-
-            ColumnPosition position = 
columnWithChangedPosition.getNewPosition();
-            UnresolvedColumn changedPositionColumn = columns.get(oldPosition);
-            columns.remove(oldPosition);
-            setColumnAtPosition(changedPositionColumn, position);
-        }
-
-        private void modifyDefinitionQuery(ModifyDefinitionQuery queryChange) {
-            expandedQuery = queryChange.getDefinitionQuery();
-            originalQuery = queryChange.getOriginalQuery();
-        }
-
-        private boolean isNonPersistedColumn(UnresolvedColumn column) {
-            return column instanceof UnresolvedComputedColumn
-                    || column instanceof UnresolvedMetadataColumn
-                            && ((UnresolvedMetadataColumn) column).isVirtual();
-        }
-
-        private void addUniqueConstraint(AddUniqueConstraint 
addUniqueConstraint) {
-            final UniqueConstraint constraint = 
addUniqueConstraint.getConstraint();
-            primaryKeyName = constraint.getName();
-            primaryKeyColumns = constraint.getColumns();
-        }
-
-        private void modifyUniqueConstraint(ModifyUniqueConstraint 
modifyUniqueConstraint) {
-            final UniqueConstraint constraint = 
modifyUniqueConstraint.getNewConstraint();
-            primaryKeyName = constraint.getName();
-            primaryKeyColumns = constraint.getColumns();
-        }
-
-        private void dropConstraint(DropConstraint dropConstraint) {
-            primaryKeyName = null;
-            primaryKeyColumns = null;
-        }
-
-        private void addWatermark(AddWatermark addWatermark) {
-            final WatermarkSpec spec = addWatermark.getWatermark();
-            String rowTimeAttribute = spec.getRowtimeAttribute();
-            ResolvedExpression expression = spec.getWatermarkExpression();
-            watermarkSpecs = List.of(new 
UnresolvedWatermarkSpec(rowTimeAttribute, expression));
-        }
-
-        private void modifyWatermark(ModifyWatermark modifyWatermark) {
-            final WatermarkSpec spec = modifyWatermark.getNewWatermark();
-            String rowTimeAttribute = spec.getRowtimeAttribute();
-            ResolvedExpression expression = spec.getWatermarkExpression();
-            watermarkSpecs = List.of(new 
UnresolvedWatermarkSpec(rowTimeAttribute, expression));
-        }
-
-        private void dropWatermark(DropWatermark dropWatermark) {
-            watermarkSpecs = List.of();
-        }
-
-        private void modifyRefreshHandler(ModifyRefreshHandler refreshHandler) 
{
-            refreshHandlerDesc = refreshHandler.getRefreshHandlerDesc();
-            refreshHandlerBytes = refreshHandler.getRefreshHandlerBytes();
-        }
-
-        private void modifyRefreshStatus(ModifyRefreshStatus 
modifyRefreshStatus) {
-            refreshStatus = modifyRefreshStatus.getRefreshStatus();
-        }
-
-        private void addDistribution(AddDistribution addDistribution) {
-            distribution = addDistribution.getDistribution();
-        }
-
-        private void modifyDistribution(ModifyDistribution modifyDistribution) 
{
-            distribution = modifyDistribution.getDistribution();
-        }
-
-        private void dropDistribution(DropDistribution dropDistribution) {
-            distribution = null;
-        }
-
-        private void setTableOption(TableChange.SetOption option) {
-            options.put(option.getKey(), option.getValue());
-        }
-
-        private void resetTableOption(TableChange.ResetOption option) {
-            options.remove(option.getKey());
-        }
-
-        private UnresolvedColumn toUnresolvedColumn(Column column) {
-            final String name = column.getName();
-            final String comment = column.getComment().orElse(null);
-            final DataType type = column.getDataType();
-            if (column instanceof PhysicalColumn) {
-                return new UnresolvedPhysicalColumn(name, type, comment);
-            } else if (column instanceof MetadataColumn) {
-                final MetadataColumn metadataColumn = (MetadataColumn) column;
-                final String metadataKey = 
metadataColumn.getMetadataKey().orElse(null);
-                return new UnresolvedMetadataColumn(
-                        name, type, metadataKey, metadataColumn.isVirtual(), 
comment);
-            } else {
-                return new UnresolvedComputedColumn(
-                        name, ((ComputedColumn) column).getExpression(), 
comment);
-            }
-        }
-
-        private void checkForChangedPositionByQuery(
-                List<TableChange> tableChanges, Schema oldSchema) {
-            List<ModifyColumnPosition> positionChanges =
-                    tableChanges.stream()
-                            .filter(t -> t instanceof ModifyColumnPosition)
-                            .map(t -> (ModifyColumnPosition) t)
-                            .collect(Collectors.toList());
-
-            List<ModifyPhysicalColumnType> physicalTypeChanges =
-                    tableChanges.stream()
-                            .filter(t -> t instanceof ModifyPhysicalColumnType)
-                            .map(t -> (ModifyPhysicalColumnType) t)
-                            .collect(Collectors.toList());
-
-            if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) {
-                return;
-            }
-
-            int persistedColumnOffset = 0;
-            List<UnresolvedColumn> oldColumns = oldSchema.getColumns();
-            for (UnresolvedColumn column : oldColumns) {
-                if (!isNonPersistedColumn(column)) {
-                    persistedColumnOffset++;
-                }
-            }
-
-            Map<String, Column> afterToColumnName = new HashMap<>();
-            for (ModifyColumnPosition change : positionChanges) {
-                final ColumnPosition position = change.getNewPosition();
-                final Column newColumn = change.getNewColumn();
-                if (position == ColumnPosition.first()) {
-                    if (persistedColumnOffset == 0) {
-                        throwPositionChangeError(
-                                newColumn.asSummaryString(),
-                                
oldColumns.get(persistedColumnOffset).toString(),
-                                persistedColumnOffset);
-                    } else {
-                        afterToColumnName.put(
-                                
oldColumns.get(persistedColumnOffset).getName(), newColumn);
-                    }
-                } else {
-                    afterToColumnName.put(((After) position).column(), 
newColumn);
-                }
-            }
-
-            for (int i = 1; i < oldColumns.size(); i++) {
-                Column newColumn = 
afterToColumnName.get(oldColumns.get(i).getName());
-                if (newColumn != null) {
-                    throwPositionChangeError(oldColumns.get(i + 1), newColumn, 
i + 1);
-                }
-            }
-
-            Map<String, Integer> nameToIndex = new HashMap<>();
-            for (int i = 0; i < oldColumns.size(); i++) {
-                UnresolvedColumn column = oldColumns.get(i);
-                if (!isNonPersistedColumn(column)) {
-                    nameToIndex.put(column.getName(), i);
-                }
-            }
-
-            for (ModifyPhysicalColumnType change : physicalTypeChanges) {
-                final int index = 
nameToIndex.get(change.getOldColumn().getName());
-                throwPositionChangeError(change.getOldColumn(), 
change.getNewColumn(), index);
-            }
-        }
-
-        private static void throwPositionChangeError(
-                UnresolvedColumn oldColumn, Column newColumn, int position) {
-            throwPositionChangeError(oldColumn.toString(), 
newColumn.asSummaryString(), position);
-        }
-
-        private static void throwPositionChangeError(
-                Column oldColumn, Column newColumn, int position) {
-            throwPositionChangeError(
-                    oldColumn.asSummaryString(), newColumn.asSummaryString(), 
position);
-        }
-
-        private static void throwPositionChangeError(
-                String oldColumn, String newColumn, int position) {
-            throw new ValidationException(
-                    String.format(
-                            "When modifying the query of a materialized table, 
"
-                                    + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                    + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                            position, oldColumn, newColumn));
-        }
-
-        private void setColumnAtPosition(UnresolvedColumn column, 
ColumnPosition position) {
-            if (position == null) {
-                columns.add(column);
-            } else if (position == ColumnPosition.first()) {
-                columns.add(0, column);
-            } else {
-                String after = ((After) position).column();
-                int index = getColumnIndex(after);
-
-                columns.add(index + 1, column);
-            }
-        }
-
-        private int getColumnIndex(String name) {
-            for (int i = 0; i < columns.size(); i++) {
-                if (Objects.equals(name, columns.get(i).getName())) {
-                    return i;
-                }
-            }
-            return -1;
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
new file mode 100644
index 00000000000..4b1d4ae395d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.TableChange;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Operation for CREATE OR ALTER MATERIALIZED TABLE ... in case materialized 
table is present and
+ * full materialized table changes should be calculated.
+ */
+@Internal
+public class FullAlterMaterializedTableOperation extends 
AlterMaterializedTableChangeOperation {
+
+    public FullAlterMaterializedTableOperation(
+            final ObjectIdentifier tableIdentifier,
+            final Function<ResolvedCatalogMaterializedTable, 
List<TableChange>> tableChangeForTable,
+            final ResolvedCatalogMaterializedTable oldTable) {
+        super(tableIdentifier, tableChangeForTable, oldTable);
+    }
+
+    @Override
+    protected String getOperationName() {
+        return "CREATE OR ALTER MATERIALIZED TABLE";
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
new file mode 100644
index 00000000000..7acde833570
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeHandler.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
+import org.apache.flink.table.api.Schema.UnresolvedWatermarkSpec;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.Column.MetadataColumn;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.AddColumn;
+import org.apache.flink.table.catalog.TableChange.AddDistribution;
+import org.apache.flink.table.catalog.TableChange.AddUniqueConstraint;
+import org.apache.flink.table.catalog.TableChange.AddWatermark;
+import org.apache.flink.table.catalog.TableChange.After;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.catalog.TableChange.DropConstraint;
+import org.apache.flink.table.catalog.TableChange.DropDistribution;
+import org.apache.flink.table.catalog.TableChange.ModifyColumn;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnComment;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
+import org.apache.flink.table.catalog.TableChange.ModifyDefinitionQuery;
+import org.apache.flink.table.catalog.TableChange.ModifyDistribution;
+import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
+import org.apache.flink.table.catalog.TableChange.ModifyRefreshHandler;
+import org.apache.flink.table.catalog.TableChange.ModifyRefreshStatus;
+import org.apache.flink.table.catalog.TableChange.ModifyUniqueConstraint;
+import org.apache.flink.table.catalog.TableChange.ModifyWatermark;
+import org.apache.flink.table.catalog.TableChange.ResetOption;
+import org.apache.flink.table.catalog.TableChange.SetOption;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/** Applying table changes to old materialized table and gathering validation 
errors. */
+@Internal
+public class MaterializedTableChangeHandler {
+    private static final HandlerRegistry HANDLER_REGISTRY = 
createHandlerRegistry();
+
+    private final List<UnresolvedColumn> columns;
+    private final CatalogMaterializedTable oldTable;
+    private boolean isQueryChange;
+    private @Nullable TableDistribution distribution;
+    private CatalogMaterializedTable.RefreshStatus refreshStatus;
+    private @Nullable String refreshHandlerDesc;
+    private byte[] refreshHandlerBytes;
+    private List<UnresolvedWatermarkSpec> watermarkSpecs;
+    private String primaryKeyName = null;
+    private List<String> primaryKeyColumns = null;
+    private int droppedPersistedCnt = 0;
+    private String originalQuery;
+    private String expandedQuery;
+    private final Map<String, String> options;
+    private final List<String> validationErrors = new ArrayList<>();
+
+    public MaterializedTableChangeHandler(CatalogMaterializedTable oldTable) {
+        this.distribution = oldTable.getDistribution().orElse(null);
+        this.refreshStatus = oldTable.getRefreshStatus();
+        this.refreshHandlerDesc = 
oldTable.getRefreshHandlerDescription().orElse(null);
+        this.refreshHandlerBytes = oldTable.getSerializedRefreshHandler();
+        this.watermarkSpecs = 
oldTable.getUnresolvedSchema().getWatermarkSpecs();
+        this.columns = new 
LinkedList<>(oldTable.getUnresolvedSchema().getColumns());
+        Schema.UnresolvedPrimaryKey primaryKey =
+                oldTable.getUnresolvedSchema().getPrimaryKey().orElse(null);
+        if (primaryKey != null) {
+            this.primaryKeyName = primaryKey.getConstraintName();
+            this.primaryKeyColumns = primaryKey.getColumnNames();
+        }
+        originalQuery = oldTable.getOriginalQuery();
+        expandedQuery = oldTable.getExpandedQuery();
+        this.oldTable = oldTable;
+        this.options = new HashMap<>(oldTable.getOptions());
+    }
+
+    private static final class HandlerRegistry {
+        private static final Map<Class<? extends TableChange>, 
HandlerRegistry.HandlerWrapper<?>>
+                HANDLERS = new IdentityHashMap<>();
+
+        private <T extends TableChange> void register(
+                Class<T> type, BiConsumer<MaterializedTableChangeHandler, T> 
handler) {
+            HANDLERS.put(type, new HandlerRegistry.HandlerWrapper<>(handler));
+        }
+
+        private void apply(MaterializedTableChangeHandler context, TableChange 
change) {
+            HandlerRegistry.HandlerWrapper<?> wrapper = 
HANDLERS.get(change.getClass());
+            if (wrapper == null) {
+                context.validationErrors.add("Unknown table change " + 
change.getClass());
+            } else {
+                wrapper.accept(context, change);
+            }
+        }
+
+        private static final class HandlerWrapper<T extends TableChange> {
+            private final BiConsumer<MaterializedTableChangeHandler, T> 
handler;
+
+            private HandlerWrapper(BiConsumer<MaterializedTableChangeHandler, 
T> handler) {
+                this.handler = handler;
+            }
+
+            private void accept(MaterializedTableChangeHandler context, 
TableChange change) {
+                handler.accept(context, (T) change);
+            }
+        }
+    }
+
+    public List<String> getValidationErrors() {
+        return List.copyOf(validationErrors);
+    }
+
+    public static MaterializedTableChangeHandler getHandlerWithChanges(
+            CatalogMaterializedTable oldTable, List<TableChange> tableChanges) 
{
+        MaterializedTableChangeHandler handler = new 
MaterializedTableChangeHandler(oldTable);
+        handler.applyTableChanges(tableChanges);
+        return handler;
+    }
+
+    public static CatalogMaterializedTable buildNewMaterializedTable(
+            MaterializedTableChangeHandler context) {
+        final List<String> validationErrors = context.getValidationErrors();
+        if (!validationErrors.isEmpty()) {
+            throw new ValidationException(String.join("\n", validationErrors));
+        }
+
+        final CatalogMaterializedTable oldTable = context.getOldTable();
+        return CatalogMaterializedTable.newBuilder()
+                .schema(context.retrieveSchema())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(context.getOptions())
+                .originalQuery(context.getOriginalQuery())
+                .expandedQuery(context.getExpandedQuery())
+                .distribution(context.getDistribution())
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(context.getRefreshStatus())
+                .refreshHandlerDescription(context.getRefreshHandlerDesc())
+                .serializedRefreshHandler(context.getRefreshHandlerBytes())
+                .build();
+    }
+
+    private static HandlerRegistry createHandlerRegistry() {
+        HandlerRegistry registry = new HandlerRegistry();
+
+        // Column operations
+        registry.register(AddColumn.class, 
MaterializedTableChangeHandler::addColumn);
+        registry.register(ModifyColumn.class, 
MaterializedTableChangeHandler::modifyColumn);
+        registry.register(TableChange.DropColumn.class, 
MaterializedTableChangeHandler::dropColumn);
+        registry.register(
+                ModifyPhysicalColumnType.class,
+                MaterializedTableChangeHandler::modifyPhysicalColumnType);
+        registry.register(
+                ModifyColumnComment.class, 
MaterializedTableChangeHandler::modifyColumnComment);
+        registry.register(
+                ModifyColumnPosition.class, 
MaterializedTableChangeHandler::modifyColumnPosition);
+
+        // Query operations
+        registry.register(
+                ModifyDefinitionQuery.class, 
MaterializedTableChangeHandler::modifyDefinitionQuery);
+
+        // Constraint operations
+        registry.register(
+                AddUniqueConstraint.class, 
MaterializedTableChangeHandler::addUniqueConstraint);
+        registry.register(
+                ModifyUniqueConstraint.class,
+                MaterializedTableChangeHandler::modifyUniqueConstraint);
+        registry.register(DropConstraint.class, 
MaterializedTableChangeHandler::dropConstraint);
+
+        // Watermark operations
+        registry.register(AddWatermark.class, 
MaterializedTableChangeHandler::addWatermark);
+        registry.register(ModifyWatermark.class, 
MaterializedTableChangeHandler::modifyWatermark);
+        registry.register(
+                TableChange.DropWatermark.class, 
MaterializedTableChangeHandler::dropWatermark);
+
+        // Refresh operations
+        registry.register(
+                ModifyRefreshHandler.class, 
MaterializedTableChangeHandler::modifyRefreshHandler);
+        registry.register(
+                ModifyRefreshStatus.class, 
MaterializedTableChangeHandler::modifyRefreshStatus);
+
+        // Distribution operations
+        registry.register(AddDistribution.class, 
MaterializedTableChangeHandler::addDistribution);
+        registry.register(
+                ModifyDistribution.class, 
MaterializedTableChangeHandler::modifyDistribution);
+        registry.register(DropDistribution.class, 
MaterializedTableChangeHandler::dropDistribution);
+
+        // Options
+        registry.register(SetOption.class, 
MaterializedTableChangeHandler::setTableOption);
+        registry.register(ResetOption.class, 
MaterializedTableChangeHandler::resetTableOption);
+
+        return registry;
+    }
+
+    void applyTableChanges(List<TableChange> tableChanges) {
+        isQueryChange = tableChanges.stream().anyMatch(t -> t instanceof 
ModifyDefinitionQuery);
+        Schema oldSchema = oldTable.getUnresolvedSchema();
+        if (isQueryChange) {
+            checkForChangedPositionByQuery(tableChanges, oldSchema);
+        }
+
+        for (TableChange tableChange : tableChanges) {
+            HANDLER_REGISTRY.apply(this, tableChange);
+        }
+
+        if (droppedPersistedCnt > 0 && isQueryChange) {
+            final int schemaSize = oldSchema.getColumns().size();
+            validationErrors.add(
+                    String.format(
+                            "Failed to modify query because drop column is 
unsupported. "
+                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
+                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
+                            schemaSize, schemaSize - droppedPersistedCnt));
+        }
+    }
+
+    public Schema retrieveSchema() {
+        Schema.Builder schemaToApply = 
Schema.newBuilder().fromColumns(columns);
+        if (primaryKeyColumns != null) {
+            if (primaryKeyName == null) {
+                schemaToApply.primaryKey(primaryKeyColumns);
+            } else {
+                schemaToApply.primaryKeyNamed(primaryKeyName, 
primaryKeyColumns);
+            }
+        }
+
+        for (UnresolvedWatermarkSpec spec : watermarkSpecs) {
+            schemaToApply.watermark(spec.getColumnName(), 
spec.getWatermarkExpression());
+        }
+        return schemaToApply.build();
+    }
+
+    public String getExpandedQuery() {
+        return expandedQuery;
+    }
+
+    public String getOriginalQuery() {
+        return originalQuery;
+    }
+
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Nullable
+    public TableDistribution getDistribution() {
+        return distribution;
+    }
+
+    public byte[] getRefreshHandlerBytes() {
+        return refreshHandlerBytes;
+    }
+
+    @Nullable
+    public String getRefreshHandlerDesc() {
+        return refreshHandlerDesc;
+    }
+
+    public CatalogMaterializedTable.RefreshStatus getRefreshStatus() {
+        return refreshStatus;
+    }
+
+    public CatalogMaterializedTable getOldTable() {
+        return oldTable;
+    }
+
+    private void addColumn(AddColumn addColumn) {
+        Column column = addColumn.getColumn();
+        ColumnPosition position = addColumn.getPosition();
+        UnresolvedColumn columnToAdd = toUnresolvedColumn(column);
+        setColumnAtPosition(columnToAdd, position);
+    }
+
+    private void modifyColumn(ModifyColumn modifyColumn) {
+        Column column = modifyColumn.getOldColumn();
+        Column newColumn = modifyColumn.getNewColumn();
+        int index = getColumnIndex(column.getName());
+        UnresolvedColumn newUnresolvedColumn = toUnresolvedColumn(newColumn);
+        columns.set(index, newUnresolvedColumn);
+    }
+
+    private void dropColumn(TableChange.DropColumn dropColumn) {
+        String droppedColumnName = dropColumn.getColumnName();
+        int index = getColumnIndex(droppedColumnName);
+        UnresolvedColumn column = columns.get(index);
+        if (isQueryChange && isNonPersistedColumn(column)) {
+            // noop
+        } else {
+            columns.remove(index);
+            droppedPersistedCnt++;
+        }
+    }
+
+    private void modifyPhysicalColumnType(ModifyPhysicalColumnType 
modifyPhysicalColumnType) {
+        Column column = modifyPhysicalColumnType.getOldColumn();
+        int position = getColumnIndex(column.getName());
+        columns.set(position, 
toUnresolvedColumn(modifyPhysicalColumnType.getNewColumn()));
+    }
+
+    private void modifyColumnComment(ModifyColumnComment modifyColumnComment) {
+        Column column = modifyColumnComment.getOldColumn();
+        int position = getColumnIndex(column.getName());
+        columns.set(position, 
toUnresolvedColumn(modifyColumnComment.getNewColumn()));
+    }
+
+    private void modifyColumnPosition(ModifyColumnPosition 
columnWithChangedPosition) {
+        Column column = columnWithChangedPosition.getOldColumn();
+        int oldPosition = getColumnIndex(column.getName());
+        if (isQueryChange) {
+            validationErrors.add(
+                    String.format(
+                            "When modifying the query of a materialized table, 
"
+                                    + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
+                                    + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                            oldPosition, column, column));
+        }
+
+        ColumnPosition position = columnWithChangedPosition.getNewPosition();
+        UnresolvedColumn changedPositionColumn = columns.get(oldPosition);
+        columns.remove(oldPosition);
+        setColumnAtPosition(changedPositionColumn, position);
+    }
+
+    private void modifyDefinitionQuery(ModifyDefinitionQuery queryChange) {
+        expandedQuery = queryChange.getDefinitionQuery();
+        originalQuery = queryChange.getOriginalQuery();
+    }
+
+    private boolean isNonPersistedColumn(UnresolvedColumn column) {
+        return column instanceof Schema.UnresolvedComputedColumn
+                || column instanceof Schema.UnresolvedMetadataColumn
+                        && ((Schema.UnresolvedMetadataColumn) 
column).isVirtual();
+    }
+
+    private void addUniqueConstraint(AddUniqueConstraint addUniqueConstraint) {
+        final UniqueConstraint constraint = 
addUniqueConstraint.getConstraint();
+        primaryKeyName = constraint.getName();
+        primaryKeyColumns = constraint.getColumns();
+    }
+
+    private void modifyUniqueConstraint(ModifyUniqueConstraint 
modifyUniqueConstraint) {
+        final UniqueConstraint constraint = 
modifyUniqueConstraint.getNewConstraint();
+        primaryKeyName = constraint.getName();
+        primaryKeyColumns = constraint.getColumns();
+    }
+
+    private void dropConstraint(DropConstraint dropConstraint) {
+        primaryKeyName = null;
+        primaryKeyColumns = null;
+    }
+
+    private void addWatermark(AddWatermark addWatermark) {
+        final WatermarkSpec spec = addWatermark.getWatermark();
+        String rowTimeAttribute = spec.getRowtimeAttribute();
+        ResolvedExpression expression = spec.getWatermarkExpression();
+        watermarkSpecs = List.of(new UnresolvedWatermarkSpec(rowTimeAttribute, 
expression));
+    }
+
+    private void modifyWatermark(ModifyWatermark modifyWatermark) {
+        final WatermarkSpec spec = modifyWatermark.getNewWatermark();
+        String rowTimeAttribute = spec.getRowtimeAttribute();
+        ResolvedExpression expression = spec.getWatermarkExpression();
+        watermarkSpecs = List.of(new UnresolvedWatermarkSpec(rowTimeAttribute, 
expression));
+    }
+
+    private void dropWatermark(TableChange.DropWatermark dropWatermark) {
+        watermarkSpecs = List.of();
+    }
+
+    private void modifyRefreshHandler(ModifyRefreshHandler refreshHandler) {
+        refreshHandlerDesc = refreshHandler.getRefreshHandlerDesc();
+        refreshHandlerBytes = refreshHandler.getRefreshHandlerBytes();
+    }
+
+    private void modifyRefreshStatus(ModifyRefreshStatus modifyRefreshStatus) {
+        refreshStatus = modifyRefreshStatus.getRefreshStatus();
+    }
+
+    private void addDistribution(AddDistribution addDistribution) {
+        distribution = addDistribution.getDistribution();
+    }
+
+    private void modifyDistribution(ModifyDistribution modifyDistribution) {
+        distribution = modifyDistribution.getDistribution();
+    }
+
+    private void dropDistribution(DropDistribution dropDistribution) {
+        distribution = null;
+    }
+
+    private void setTableOption(SetOption option) {
+        options.put(option.getKey(), option.getValue());
+    }
+
+    private void resetTableOption(ResetOption option) {
+        options.remove(option.getKey());
+    }
+
+    private UnresolvedColumn toUnresolvedColumn(Column column) {
+        final String name = column.getName();
+        final String comment = column.getComment().orElse(null);
+        final DataType type = column.getDataType();
+        if (column instanceof Column.PhysicalColumn) {
+            return new UnresolvedPhysicalColumn(name, type, comment);
+        } else if (column instanceof MetadataColumn) {
+            final MetadataColumn metadataColumn = (MetadataColumn) column;
+            final String metadataKey = 
metadataColumn.getMetadataKey().orElse(null);
+            return new Schema.UnresolvedMetadataColumn(
+                    name, type, metadataKey, metadataColumn.isVirtual(), 
comment);
+        } else {
+            return new Schema.UnresolvedComputedColumn(
+                    name, ((Column.ComputedColumn) column).getExpression(), 
comment);
+        }
+    }
+
+    private void checkForChangedPositionByQuery(List<TableChange> 
tableChanges, Schema oldSchema) {
+        List<ModifyColumnPosition> positionChanges =
+                tableChanges.stream()
+                        .filter(t -> t instanceof ModifyColumnPosition)
+                        .map(t -> (ModifyColumnPosition) t)
+                        .collect(Collectors.toList());
+
+        List<ModifyPhysicalColumnType> physicalTypeChanges =
+                tableChanges.stream()
+                        .filter(t -> t instanceof ModifyPhysicalColumnType)
+                        .map(t -> (ModifyPhysicalColumnType) t)
+                        .collect(Collectors.toList());
+
+        if (positionChanges.isEmpty() && physicalTypeChanges.isEmpty()) {
+            return;
+        }
+
+        int persistedColumnOffset = 0;
+        List<UnresolvedColumn> oldColumns = oldSchema.getColumns();
+        for (UnresolvedColumn column : oldColumns) {
+            if (!isNonPersistedColumn(column)) {
+                persistedColumnOffset++;
+            }
+        }
+
+        Map<String, Column> afterToColumnName = new HashMap<>();
+        for (ModifyColumnPosition change : positionChanges) {
+            final ColumnPosition position = change.getNewPosition();
+            final Column newColumn = change.getNewColumn();
+            if (position == ColumnPosition.first()) {
+                if (persistedColumnOffset == 0) {
+                    positionChangeError(
+                            newColumn.asSummaryString(),
+                            oldColumns.get(persistedColumnOffset).toString(),
+                            persistedColumnOffset);
+                } else {
+                    afterToColumnName.put(
+                            oldColumns.get(persistedColumnOffset).getName(), 
newColumn);
+                }
+            } else {
+                afterToColumnName.put(((After) position).column(), newColumn);
+            }
+        }
+
+        for (int i = 0; i < oldColumns.size() - 1; i++) {
+            Column newColumn = 
afterToColumnName.get(oldColumns.get(i).getName());
+            if (newColumn != null) {
+                positionChangeError(oldColumns.get(i + 1), newColumn, i + 1);
+            }
+        }
+
+        Map<String, Integer> nameToIndex = new HashMap<>();
+        for (int i = 0; i < oldColumns.size(); i++) {
+            UnresolvedColumn column = oldColumns.get(i);
+            if (!isNonPersistedColumn(column)) {
+                nameToIndex.put(column.getName(), i);
+            }
+        }
+
+        for (ModifyPhysicalColumnType change : physicalTypeChanges) {
+            final int index = nameToIndex.get(change.getOldColumn().getName());
+            positionChangeError(change.getOldColumn(), change.getNewColumn(), 
index);
+        }
+    }
+
+    private void positionChangeError(UnresolvedColumn oldColumn, Column 
newColumn, int position) {
+        positionChangeError(oldColumn.toString(), newColumn.asSummaryString(), 
position);
+    }
+
+    private void positionChangeError(Column oldColumn, Column newColumn, int 
position) {
+        positionChangeError(oldColumn.asSummaryString(), 
newColumn.asSummaryString(), position);
+    }
+
+    private void positionChangeError(String oldColumn, String newColumn, int 
position) {
+        validationErrors.add(
+                String.format(
+                        "When modifying the query of a materialized table, "
+                                + "currently only support appending columns at 
the end of original schema, dropping, renaming, and reordering columns are not 
supported.\n"
+                                + "Column mismatch at position %d: Original 
column is [%s], but new column is [%s].",
+                        position, oldColumn, newColumn));
+    }
+
+    private void setColumnAtPosition(UnresolvedColumn column, ColumnPosition 
position) {
+        if (position == null) {
+            columns.add(column);
+        } else if (position == ColumnPosition.first()) {
+            columns.add(0, column);
+        } else {
+            String after = ((After) position).column();
+            int index = getColumnIndex(after);
+
+            columns.add(index + 1, column);
+        }
+    }
+
+    private int getColumnIndex(String name) {
+        for (int i = 0; i < columns.size(); i++) {
+            if (Objects.equals(name, columns.get(i).getName())) {
+                return i;
+            }
+        }
+        return -1;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
index fca8c516be2..4b8c4ccf318 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractAlterMaterializedTableConverter.java
@@ -22,17 +22,18 @@ import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTab
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.utils.ValidationUtils;
 import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
 
+import java.util.List;
 import java.util.Optional;
-import java.util.function.Consumer;
+import java.util.function.Function;
 
 /** Abstract converter for {@link SqlAlterMaterializedTable}. */
 public abstract class AbstractAlterMaterializedTableConverter<T extends 
SqlAlterMaterializedTable>
@@ -44,6 +45,9 @@ public abstract class 
AbstractAlterMaterializedTableConverter<T extends SqlAlter
     protected abstract Operation convertToOperation(
             T sqlAlterTable, ResolvedCatalogMaterializedTable oldTable, 
ConvertContext context);
 
+    protected abstract Function<ResolvedCatalogMaterializedTable, 
List<TableChange>>
+            gatherTableChanges(T sqlAlterTable, ConvertContext context);
+
     @Override
     public final Operation convertSqlNode(T sqlAlterMaterializedTable, 
ConvertContext context) {
         final CatalogManager catalogManager = context.getCatalogManager();
@@ -75,29 +79,4 @@ public abstract class 
AbstractAlterMaterializedTableConverter<T extends SqlAlter
                 
UnresolvedIdentifier.of(sqlAlterMaterializedTable.getFullName());
         return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
     }
-
-    protected CatalogMaterializedTable buildUpdatedMaterializedTable(
-            ResolvedCatalogMaterializedTable oldTable,
-            Consumer<CatalogMaterializedTable.Builder> consumer) {
-
-        CatalogMaterializedTable.Builder builder =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(oldTable.getUnresolvedSchema())
-                        .comment(oldTable.getComment())
-                        .partitionKeys(oldTable.getPartitionKeys())
-                        .options(oldTable.getOptions())
-                        .originalQuery(oldTable.getOriginalQuery())
-                        .expandedQuery(oldTable.getExpandedQuery())
-                        .distribution(oldTable.getDistribution().orElse(null))
-                        .freshness(oldTable.getDefinitionFreshness())
-                        .logicalRefreshMode(oldTable.getLogicalRefreshMode())
-                        .refreshMode(oldTable.getRefreshMode())
-                        .refreshStatus(oldTable.getRefreshStatus())
-                        .refreshHandlerDescription(
-                                
oldTable.getRefreshHandlerDescription().orElse(null))
-                        
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler());
-
-        consumer.accept(builder);
-        return builder.build();
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
index e78a753b3df..c475ef70aae 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAddDistributionConverter.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 /** A converter for {@link SqlAlterMaterializedTableAddDistribution}. */
 public class SqlAlterMaterializedTableAddDistributionConverter
@@ -39,13 +40,23 @@ public class 
SqlAlterMaterializedTableAddDistributionConverter
             SqlAlterMaterializedTableAddDistribution sqlAddDistribution,
             ResolvedCatalogMaterializedTable oldTable,
             ConvertContext context) {
-        TableDistribution distribution = 
getTableDistribution(sqlAddDistribution, oldTable);
+
         return new AlterMaterializedTableChangeOperation(
                 resolveIdentifier(sqlAddDistribution, context),
-                List.of(TableChange.add(distribution)),
+                gatherTableChanges(sqlAddDistribution, context),
                 oldTable);
     }
 
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            SqlAlterMaterializedTableAddDistribution sqlAddDistribution, 
ConvertContext context) {
+        return oldTable -> {
+            final TableDistribution distribution =
+                    getTableDistribution(sqlAddDistribution, oldTable);
+            return List.of(TableChange.add(distribution));
+        };
+    }
+
     private TableDistribution getTableDistribution(
             SqlAlterMaterializedTableDistribution sqlAlterTableDistribution,
             ResolvedCatalogMaterializedTable oldTable) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
index ef251ab8d4a..b48d472593a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.operations.converters.materializedtable;
 
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -30,7 +31,9 @@ import 
org.apache.flink.table.planner.utils.MaterializedTableUtils;
 
 import org.apache.calcite.sql.SqlNode;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 /** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
 public class SqlAlterMaterializedTableAsQueryConverter
@@ -41,23 +44,42 @@ public class SqlAlterMaterializedTableAsQueryConverter
             SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery,
             ResolvedCatalogMaterializedTable oldTable,
             ConvertContext context) {
-        ObjectIdentifier identifier = resolveIdentifier(sqlAlterTableAsQuery, 
context);
-
-        // Validate and extract schema from query
-        String originalQuery = 
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
-        SqlNode validatedQuery =
-                
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
-        String definitionQuery = context.toQuotedSqlString(validatedQuery);
-        PlannerQueryOperation queryOperation =
-                new PlannerQueryOperation(
-                        context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
-        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
-        ResolvedSchema newSchema = queryOperation.getResolvedSchema();
-
-        List<TableChange> tableChanges =
-                MaterializedTableUtils.buildSchemaTableChanges(oldSchema, 
newSchema);
-        tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery, 
definitionQuery));
-
-        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, oldTable);
+        final ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterTableAsQuery, context);
+        return new AlterMaterializedTableAsQueryOperation(
+                identifier, gatherTableChanges(sqlAlterTableAsQuery, context), 
oldTable);
+    }
+
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        return oldTable -> {
+            // Validate and extract schema from query
+            String originalQuery = 
context.toQuotedSqlString(sqlAlterTableAsQuery.getAsQuery());
+            SqlNode validatedQuery =
+                    
context.getSqlValidator().validate(sqlAlterTableAsQuery.getAsQuery());
+            String definitionQuery = context.toQuotedSqlString(validatedQuery);
+            PlannerQueryOperation queryOperation =
+                    new PlannerQueryOperation(
+                            context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
+
+            ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+            ResolvedSchema newSchema = queryOperation.getResolvedSchema();
+            List<TableChange> tableChanges =
+                    new ArrayList<>(
+                            
MaterializedTableUtils.buildSchemaTableChanges(oldSchema, newSchema));
+
+            if (!tableChanges.isEmpty()) {
+                final boolean hasNonPersistedColumn =
+                        oldSchema.getColumns().stream().anyMatch(c -> 
!c.isPersisted());
+                if (hasNonPersistedColumn) {
+                    throw new ValidationException(
+                            "ALTER query for MATERIALIZED TABLE "
+                                    + "with schema containing non persisted 
columns is not supported, "
+                                    + "consider using CREATE OR ALTER 
MATERIALIZED TABLE instead");
+                }
+            }
+            tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery, 
definitionQuery));
+            return tableChanges;
+        };
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
index 2d81569d4a1..7d85376088d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropDistributionConverter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
 
 import java.util.List;
+import java.util.function.Function;
 
 /** A converter for {@link SqlAlterMaterializedTableDropDistribution}. */
 public class SqlAlterMaterializedTableDropDistributionConverter
@@ -45,6 +46,12 @@ public class 
SqlAlterMaterializedTableDropDistributionConverter
         }
 
         return new AlterMaterializedTableChangeOperation(
-                identifier, List.of(TableChange.dropDistribution()), oldTable);
+                identifier, gatherTableChanges(sqlDropDistribution, context), 
oldTable);
+    }
+
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            SqlAlterMaterializedTableDropDistribution sqlAlterTable, 
ConvertContext context) {
+        return oldTable -> List.of(TableChange.dropDistribution());
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
index d953c8ac619..e8f8a41b44d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableDropSchemaConverter.java
@@ -38,6 +38,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -50,12 +51,11 @@ public abstract class 
SqlAlterMaterializedTableDropSchemaConverter<
     @Override
     protected Operation convertToOperation(
             T alterTableSchema, ResolvedCatalogMaterializedTable oldTable, 
ConvertContext context) {
-        Set<String> columnsToDrop = getColumnsToDrop(alterTableSchema);
-        List<TableChange> tableChanges =
-                validateAndGatherDropChanges(alterTableSchema, oldTable, 
columnsToDrop, context);
 
         return new AlterMaterializedTableChangeOperation(
-                resolveIdentifier(alterTableSchema, context), tableChanges, 
oldTable);
+                resolveIdentifier(alterTableSchema, context),
+                gatherTableChanges(alterTableSchema, context),
+                oldTable);
     }
 
     protected abstract List<TableChange> validateAndGatherDropChanges(
@@ -66,6 +66,15 @@ public abstract class 
SqlAlterMaterializedTableDropSchemaConverter<
 
     protected abstract Set<String> getColumnsToDrop(T alterTableSchema);
 
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            T alterTableSchema, ConvertContext context) {
+        return oldTable -> {
+            Set<String> columnsToDrop = getColumnsToDrop(alterTableSchema);
+            return validateAndGatherDropChanges(alterTableSchema, oldTable, 
columnsToDrop, context);
+        };
+    }
+
     /**
      * Convert {@code ALTER TABLE MATERIALIZED TABLE DROP PRIMARY KEY} to 
generate an updated
      * Schema.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
index b02f3f3ae18..ed490156f0c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableModifyDistributionConverter.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import java.util.List;
+import java.util.function.Function;
 
 /** A converter for {@link SqlAlterMaterializedTableModifyDistribution}. */
 public class SqlAlterMaterializedTableModifyDistributionConverter
@@ -48,11 +49,17 @@ public class 
SqlAlterMaterializedTableModifyDistributionConverter
                             identifier));
         }
 
-        TableDistribution tableDistribution =
+        return new AlterMaterializedTableChangeOperation(
+                identifier, gatherTableChanges(sqlModifyDistribution, 
context), oldTable);
+    }
+
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            SqlAlterMaterializedTableModifyDistribution sqlModifyDistribution,
+            ConvertContext context) {
+        final TableDistribution tableDistribution =
                 OperationConverterUtils.getDistributionFromSqlDistribution(
                         sqlModifyDistribution.getDistribution().get());
-
-        return new AlterMaterializedTableChangeOperation(
-                identifier, List.of(TableChange.modify(tableDistribution)), 
oldTable);
+        return oldTable -> List.of(TableChange.modify(tableDistribution));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
index f407551f074..05ffdd3ce45 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.Column.PhysicalColumn;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
 import org.apache.flink.table.planner.operations.converters.SchemaAddConverter;
@@ -47,6 +48,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  * Abstract class for converting {@link SqlAlterMaterializedTableSchema} and 
its children for alter
@@ -58,23 +60,32 @@ public abstract class 
SqlAlterMaterializedTableSchemaConverter<
     @Override
     protected Operation convertToOperation(
             T alterTableSchema, ResolvedCatalogMaterializedTable oldTable, 
ConvertContext context) {
-        MaterializedTableUtils.validatePersistedColumnsUsedByQuery(
-                oldTable, alterTableSchema, context);
-
-        SchemaConverter converter = createSchemaConverter(oldTable, context);
-        
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
-        alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
-        
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
-        Schema schema = converter.convert();
-
-        validateChanges(oldTable.getResolvedSchema(), schema, context);
-
         return new AlterMaterializedTableChangeOperation(
                 resolveIdentifier(alterTableSchema, context),
-                converter.getChangesCollector(),
+                gatherTableChanges(alterTableSchema, context),
                 oldTable);
     }
 
+    @Override
+    protected Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
gatherTableChanges(
+            T alterTableSchema, ConvertContext context) {
+        return oldTable -> {
+            MaterializedTableUtils.validatePersistedColumnsUsedByQuery(
+                    oldTable, alterTableSchema, context);
+
+            SchemaConverter converter = createSchemaConverter(oldTable, 
context);
+            
converter.updateColumn(alterTableSchema.getColumnPositions().getList());
+            
alterTableSchema.getWatermark().ifPresent(converter::updateWatermark);
+            
alterTableSchema.getFullConstraint().ifPresent(converter::updatePrimaryKey);
+            Schema schema = converter.convert();
+
+            ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+            validateChanges(oldSchema, schema, context);
+
+            return converter.getChangesCollector();
+        };
+    }
+
     protected abstract SchemaConverter createSchemaConverter(
             ResolvedCatalogMaterializedTable oldTable, ConvertContext context);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 4fe78a9a698..3e7094580db 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -31,8 +31,8 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.Operation;
-import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
 import org.apache.flink.table.planner.operations.converters.MergeTableAsUtil;
 import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
 /** A converter for {@link SqlCreateOrAlterMaterializedTable}. */
 public class SqlCreateOrAlterMaterializedTableConverter
@@ -100,10 +101,8 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final ConvertContext context,
             final ObjectIdentifier identifier) {
         final MergeContext mergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
-
-        final List<TableChange> tableChanges = buildTableChanges(oldTable, 
mergeContext);
-
-        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, oldTable);
+        return new FullAlterMaterializedTableOperation(
+                identifier, buildTableChanges(mergeContext), oldTable);
     }
 
     private Operation handleCreate(
@@ -116,49 +115,37 @@ public class SqlCreateOrAlterMaterializedTableConverter
         return new CreateMaterializedTableOperation(identifier, resolvedTable);
     }
 
-    private List<TableChange> buildTableChanges(
-            final ResolvedCatalogMaterializedTable oldTable, final 
MergeContext mergeContext) {
-        final List<TableChange> changes = new ArrayList<>();
-
-        final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
-        final List<Column> newColumns =
-                MaterializedTableUtils.validateAndExtractNewColumns(
-                        oldSchema, mergeContext.getMergedQuerySchema());
+    private Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
buildTableChanges(
+            final MergeContext mergeContext) {
+        return oldTable -> {
+            final List<TableChange> changes = new ArrayList<>();
 
-        newColumns.forEach(column -> changes.add(TableChange.add(column)));
-        changes.add(
-                TableChange.modifyDefinitionQuery(
-                        mergeContext.getMergedOriginalQuery(),
-                        mergeContext.getMergedExpandedQuery()));
+            final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+            final List<Column> newColumns =
+                    MaterializedTableUtils.validateAndExtractNewColumns(
+                            oldSchema, mergeContext.getMergedQuerySchema());
 
-        changes.addAll(
-                calculateOptionsChange(
-                        oldTable.getOptions(), 
mergeContext.getMergedTableOptions()));
-
-        return changes;
-    }
+            newColumns.forEach(column -> changes.add(TableChange.add(column)));
+            changes.add(
+                    TableChange.modifyDefinitionQuery(
+                            mergeContext.getMergedOriginalQuery(),
+                            mergeContext.getMergedExpandedQuery()));
 
-    public static List<TableChange> calculateOptionsChange(
-            Map<String, String> oldOptions, Map<String, String> newOptions) {
-        if (oldOptions.equals(newOptions)) {
-            return List.of();
-        }
+            final Map<String, String> oldOptions = oldTable.getOptions();
+            final Map<String, String> newOptions = 
mergeContext.getMergedTableOptions();
 
-        final List<TableChange> changes = new ArrayList<>();
-        for (Map.Entry<String, String> option : newOptions.entrySet()) {
-            final String oldValue = oldOptions.get(option.getKey());
-            if (oldValue == null || !oldValue.equals(option.getValue())) {
-                changes.add(TableChange.set(option.getKey(), 
option.getValue()));
+            for (Map.Entry<String, String> newOptionEntry : 
newOptions.entrySet()) {
+                changes.add(TableChange.set(newOptionEntry.getKey(), 
newOptionEntry.getValue()));
             }
-        }
 
-        for (Map.Entry<String, String> option : oldOptions.entrySet()) {
-            if (newOptions.get(option.getKey()) == null) {
-                changes.add(TableChange.reset(option.getKey()));
+            for (Map.Entry<String, String> oldOptionEntry : 
oldOptions.entrySet()) {
+                if (newOptions.get(oldOptionEntry.getKey()) == null) {
+                    changes.add(TableChange.reset(oldOptionEntry.getKey()));
+                }
             }
-        }
 
-        return changes;
+            return changes;
+        };
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 70e09e89354..954f876240b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -26,13 +26,13 @@ import 
org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
 import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema;
 import org.apache.flink.sql.parser.ddl.position.SqlTableColumnPosition;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.Column.ComputedColumn;
 import org.apache.flink.table.catalog.Column.MetadataColumn;
 import org.apache.flink.table.catalog.IntervalFreshness;
-import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableChange.ColumnPosition;
@@ -40,6 +40,7 @@ import 
org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
 
 import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.type.SqlTypeFamily;
@@ -74,8 +75,7 @@ public class MaterializedTableUtils {
                     "Materialized table freshness only support SECOND, MINUTE, 
HOUR, DAY as the time unit.");
         }
 
-        SqlIntervalLiteral.IntervalValue intervalValue =
-                
sqlIntervalLiteral.getValueAs(SqlIntervalLiteral.IntervalValue.class);
+        IntervalValue intervalValue = 
sqlIntervalLiteral.getValueAs(IntervalValue.class);
         String interval = intervalValue.getIntervalLiteral();
         switch (intervalValue.getIntervalQualifier().typeName()) {
             case INTERVAL_DAY:
@@ -127,17 +127,18 @@ public class MaterializedTableUtils {
     // ALTER MATERIALIZED TABLE ... AS ...
     public static List<TableChange> buildSchemaTableChanges(
             ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        if (!isSchemaChanged(oldSchema, newSchema)) {
+            return List.of();
+        }
+
         final List<Column> oldColumns = oldSchema.getColumns();
-        final List<Column> newColumns = newSchema.getColumns();
-        int persistedColumnOffset = 0;
         final Map<String, Tuple2<Column, Integer>> oldColumnSet = new 
HashMap<>();
         for (int i = 0; i < oldColumns.size(); i++) {
             Column column = oldColumns.get(i);
-            if (!column.isPersisted()) {
-                persistedColumnOffset++;
-            }
             oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i), 
i));
         }
+        // Schema retrieved from query doesn't count existing non persisted 
columns
+        final List<Column> newColumns = newSchema.getColumns();
 
         List<TableChange> changes = new ArrayList<>();
         for (int i = 0; i < newColumns.size(); i++) {
@@ -150,8 +151,7 @@ public class MaterializedTableUtils {
             }
 
             // Check if position changed
-            applyPositionChanges(
-                    newColumns, oldColumnToPosition, i + 
persistedColumnOffset, changes);
+            applyPositionChanges(newColumns, oldColumnToPosition, i, changes);
 
             Column oldColumn = oldColumnToPosition.f0;
             // Check if column changed
@@ -208,6 +208,33 @@ public class MaterializedTableUtils {
         return changes;
     }
 
+    // Since it is only for query change, then check only persisted columns 
which could be
+    // changed/added/dropped with such change
+    private static boolean isSchemaChanged(ResolvedSchema oldSchema, 
ResolvedSchema newSchema) {
+        List<Column> oldPersistedColumns =
+                oldSchema.getColumns().stream()
+                        .filter(Column::isPersisted)
+                        .collect(Collectors.toList());
+        if (oldPersistedColumns.size() != newSchema.getColumnCount()) {
+            return true;
+        }
+        for (int i = 0; i < oldPersistedColumns.size(); i++) {
+            Column oldColumn = oldPersistedColumns.get(i);
+            Column newColumn = newSchema.getColumn(i).get();
+            if (!oldColumn.getName().equals(newColumn.getName())) {
+                return true;
+            }
+            if (!newColumn
+                    .getDataType()
+                    .getLogicalType()
+                    .equals(oldColumn.getDataType().getLogicalType())) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     private static void applyPositionChanges(
             List<Column> newColumns,
             Tuple2<Column, Integer> oldColumnToPosition,
@@ -290,7 +317,7 @@ public class MaterializedTableUtils {
     }
 
     public static ResolvedSchema getQueryOperationResolvedSchema(
-            ResolvedCatalogMaterializedTable oldTable, ConvertContext context) 
{
+            CatalogMaterializedTable oldTable, ConvertContext context) {
         final SqlNode originalQuery =
                 
context.getFlinkPlanner().parser().parse(oldTable.getOriginalQuery());
         final SqlNode validateQuery = 
context.getSqlValidator().validate(originalQuery);
@@ -302,7 +329,7 @@ public class MaterializedTableUtils {
     }
 
     public static void validatePersistedColumnsUsedByQuery(
-            ResolvedCatalogMaterializedTable oldTable,
+            CatalogMaterializedTable oldTable,
             SqlAlterMaterializedTableSchema alterTableSchema,
             ConvertContext context) {
         final SqlNodeList sqlNodeList = alterTableSchema.getColumnPositions();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 66b338aebb6..2d5aecfff4a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1553,9 +1553,13 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                 "tb2", false, 1, TableDistribution.of(Kind.HASH, 1, 
List.of("a")), "SELECT 1");
 
         assertThatThrownBy(
-                        () ->
-                                parse(
-                                        "alter materialized table cat1.db1.tb2 
add distribution into 3 buckets"))
+                        () -> {
+                            AlterMaterializedTableChangeOperation 
tableChangeOperation =
+                                    (AlterMaterializedTableChangeOperation)
+                                            parse(
+                                                    "alter materialized table 
cat1.db1.tb2 add distribution into 3 buckets");
+                            tableChangeOperation.getTableChanges();
+                        })
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "The current materialized table has already defined 
the distribution "
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index e56180d633c..9b928b4333d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
 import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
 import org.apache.flink.table.planner.utils.TableFunc0;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -178,6 +179,19 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         + "AS SELECT 1";
 
         createMaterializedTableInCatalog(sqlWithNonPersisted, 
"base_mtbl_with_non_persisted");
+
+        // MATERIALIZED TABLE with non persisted columns last
+        final String sqlWithNonPersistedLast =
+                "CREATE MATERIALIZED TABLE base_mtbl_with_non_persisted_last 
(\n"
+                        + "   a INT NOT NULL,"
+                        + "   c AS UPPER(CAST(a AS STRING))"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT 1 as a";
+
+        createMaterializedTableInCatalog(
+                sqlWithNonPersistedLast, "base_mtbl_with_non_persisted_last");
     }
 
     @Test
@@ -389,7 +403,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         () -> {
                             AlterMaterializedTableChangeOperation operation =
                                     (AlterMaterializedTableChangeOperation) 
parse(spec.sql);
-                            operation.getMaterializedTableWithAppliedChanges();
+                            operation.getNewTable();
                         })
                 .as(spec.sql)
                 .isInstanceOf(spec.expectedException)
@@ -408,8 +422,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
     void createAlterTableSuccessCase(TestSpec testSpec) {
         AlterMaterializedTableChangeOperation operation =
                 (AlterMaterializedTableChangeOperation) parse(testSpec.sql);
-        CatalogMaterializedTable catalogMaterializedTable =
-                operation.getMaterializedTableWithAppliedChanges();
+        CatalogMaterializedTable catalogMaterializedTable = 
operation.getNewTable();
         assertThat(catalogMaterializedTable.getUnresolvedSchema())
                 .hasToString(testSpec.expectedSchema);
     }
@@ -538,7 +551,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 (CatalogMaterializedTable)
                         catalog.getTable(
                                 new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
-        CatalogMaterializedTable newTable = 
op.getMaterializedTableWithAppliedChanges();
+        CatalogMaterializedTable newTable = op.getNewTable();
 
         
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
         assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
@@ -652,10 +665,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         + "AS SELECT a, b, c, d, d as e, cast('123' as string) 
as f FROM t3";
         Operation operation = parse(sql);
 
-        
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
 
-        AlterMaterializedTableAsQueryOperation op =
-                (AlterMaterializedTableAsQueryOperation) operation;
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
         assertThat(op.getTableChanges())
                 .containsExactly(
                         TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
@@ -668,15 +680,20 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         TableChange.reset("connector"));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
-                                + "FROM `builtin`.`default`.`t3` AS `t3`");
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.base_mtbl\n"
+                                + "  ADD `e` STRING ,\n"
+                                + "  ADD `f` STRING ,\n"
+                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) 
AS `f`\n"
+                                + "FROM `builtin`.`default`.`t3` AS `t3`',\n"
+                                + "  SET 'format' = 'json2',\n"
+                                + "  RESET 'connector'");
 
         // new table only difference schema & definition query with old table.
         CatalogMaterializedTable oldTable =
                 (CatalogMaterializedTable)
                         catalog.getTable(
                                 new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
-        CatalogMaterializedTable newTable = 
op.getMaterializedTableWithAppliedChanges();
+        CatalogMaterializedTable newTable = op.getNewTable();
 
         assertThat(newTable.getOptions()).containsExactly(Map.entry("format", 
"json2"));
         
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
@@ -748,6 +765,11 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "renaming, and reordering columns are not 
supported.\n"
                                 + "Column mismatch at position 3: Original 
column is [`d` STRING], "
                                 + "but new column is [`d` STRING NOT NULL]."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT '123'",
+                        "ALTER query for MATERIALIZED TABLE "
+                                + "with schema containing non persisted 
columns is not supported, "
+                                + "consider using CREATE OR ALTER MATERIALIZED 
TABLE instead"),
                 TestSpec.of(
                         "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1",
                         "ALTER MATERIALIZED TABLE for a table is not 
allowed"));
@@ -1200,7 +1222,23 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
         list.add(
                 TestSpec.withExpectedSchema(
-                        "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT 1",
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted_last AS SELECT 1 as a",
+                        "(\n"
+                                + "  `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+                                + "  `a` INT NOT NULL\n"
+                                + ")"));
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted_last AS SELECT 1 as a",
+                        "(\n"
+                                + "  `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+                                + "  `a` INT NOT NULL\n"
+                                + ")"));
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted AS SELECT 1",
                         "(\n"
                                 + "  `m` STRING METADATA VIRTUAL,\n"
                                 + "  `calc` AS ['a' || 'b'],\n"
@@ -1209,7 +1247,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
         list.add(
                 TestSpec.withExpectedSchema(
-                        "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT 2, 'a' AS sec",
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec",
                         "(\n"
                                 + "  `m` STRING METADATA VIRTUAL,\n"
                                 + "  `calc` AS ['a' || 'b'],\n"
@@ -1226,6 +1264,15 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "  `EXPR$0` INT NOT NULL,\n"
                                 + "  `sec` CHAR(1)\n"
                                 + ")"));
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        // Schema doesn't change, so should be ok
+                        "ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted_last AS SELECT 123 as a",
+                        "(\n"
+                                + "  `c` AS [UPPER(CAST(`a` AS STRING))],\n"
+                                + "  `a` INT NOT NULL\n"
+                                + ")"));
         return list;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
index 17ee036a51d..1859c0f531b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/MaterializedTableUtilsTest.java
@@ -48,30 +48,10 @@ class MaterializedTableUtilsTest {
                         schema(physical("a", DataTypes.INT())),
                         schema(physical("a", DataTypes.INT())),
                         List.of()),
-                TestSpec.of(
-                        schema(physical("a", DataTypes.INT())),
-                        schema(physical("a", 
DataTypes.INT()).withComment("comment")),
-                        List.of(
-                                TableChange.modifyColumnComment(
-                                        physical("a", DataTypes.INT()), 
"comment"))),
                 TestSpec.of(
                         schema(physical("a", 
DataTypes.INT()).withComment("comment")),
                         schema(physical("a", 
DataTypes.INT()).withComment("comment")),
                         List.of()),
-                TestSpec.of(
-                        schema(physical("a", 
DataTypes.TIMESTAMP()).withComment("comment")),
-                        schema(physical("a", 
DataTypes.TIMESTAMP()).withComment("comment 2")),
-                        List.of(
-                                TableChange.modifyColumnComment(
-                                        physical("a", 
DataTypes.TIMESTAMP()).withComment("comment"),
-                                        "comment 2"))),
-                TestSpec.of(
-                        schema(physical("a", 
DataTypes.FLOAT()).withComment("comment")),
-                        schema(physical("a", DataTypes.FLOAT())),
-                        List.of(
-                                TableChange.modifyColumnComment(
-                                        physical("a", 
DataTypes.FLOAT()).withComment("comment"),
-                                        null))),
                 TestSpec.of(
                         schema(physical("a", 
DataTypes.INT()).withComment("comment")),
                         schema(physical("a2", 
DataTypes.STRING()).withComment("comment 2")),

Reply via email to