This is an automated email from the ASF dual-hosted git repository.

AHeise pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ac4c4a08e0574f92d75885c872865a69e838312
Author: Arvid Heise <[email protected]>
AuthorDate: Mon May 18 21:13:26 2026 +0200

    [FLINK-39700][table-planner] Name-based column matching; CREATE OR ALTER 
drops absent non-persisted columns
    
    validateAndExtractColumnChanges switches from position-based throws to 
name-based diff: columns are
    matched by name; new columns emit AddColumn; matched columns with 
type/class/comment differences emit
    ModifyPhysicalColumnType/ModifyColumn/ModifyColumnComment; absent columns 
emit DropColumn (skipping
    non-persisted when schemaDefinedInQuery=false). This allows physical DDL 
columns to be declared in a
    different order than the AS SELECT projection.
    
    AlterMaterializedTableChangeOperation gains a protected computeNewTable() 
hook so subclasses can plug
    in a different builder without shadowing the cached newTable field. 
FullAlterMaterializedTableOperation
    overrides the hook with a newTableBuilder lambda supplied by 
SqlCreateOrAlterMaterializedTableConverter
    so non-persisted columns absent from the new DDL are dropped (declarative 
CREATE OR ALTER semantics).
    Tests for validateAndExtractColumnChanges are added alongside.
---
 .../AlterMaterializedTableChangeOperation.java     | 105 ++++----
 .../FullAlterMaterializedTableOperation.java       |  14 +-
 .../AbstractCreateMaterializedTableConverter.java  |   6 +
 ...SqlCreateOrAlterMaterializedTableConverter.java | 130 +++++++---
 .../planner/utils/MaterializedTableUtils.java      | 137 ++++++----
 ...erializedTableNodeToOperationConverterTest.java |   9 +-
 .../utils/ValidateAndExtractColumnChangesTest.java | 281 +++++++++++++++++++++
 7 files changed, 539 insertions(+), 143 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index 2961dc88a10..823cacb4327 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.operations.materializedtable;
 
-import org.apache.flink.annotation.Confluent;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.api.ValidationException;
@@ -82,33 +81,23 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
 
     public CatalogMaterializedTable getNewTable() {
         if (newTable == null) {
-            newTable =
-                    MaterializedTableChangeHandler.buildNewMaterializedTable(
-                            getHandlerWithChanges());
+            newTable = computeNewTable();
         }
         return newTable;
     }
 
-    @Confluent
     public ResolvedCatalogMaterializedTable getOldTable() {
         return oldTable;
     }
 
-    @Confluent
     public void setOldTable(final ResolvedCatalogMaterializedTable oldTable) {
         this.oldTable = oldTable;
+        // All caches are derived from oldTable; invalidate them together.
+        this.tableChanges = null;
+        this.handler = null;
         this.newTable = null;
     }
 
-    protected MaterializedTableChangeHandler getHandlerWithChanges() {
-        if (handler == null) {
-            handler =
-                    MaterializedTableChangeHandler.getHandlerWithChanges(
-                            oldTable, getTableChanges());
-        }
-        return handler;
-    }
-
     @VisibleForTesting
     public void validateChanges() {
         final List<TableChange> changes = getTableChanges();
@@ -137,18 +126,55 @@ public class AlterMaterializedTableChangeOperation 
extends AlterMaterializedTabl
         }
     }
 
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        validateChanges();
+        ctx.getCatalogManager()
+                .alterTable(getNewTable(), getTableChanges(), 
getTableIdentifier(), false);
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    @Override
+    public String asSummaryString() {
+        String changes =
+                getTableChanges().stream()
+                        .map(AlterMaterializedTableChangeOperation::toString)
+                        .collect(Collectors.joining(",\n"));
+        return String.format(
+                "%s %s\n%s", getOperationName(), 
tableIdentifier.asSummaryString(), changes);
+    }
+
+    /** Hook for subclasses to provide a different new-table builder. */
+    protected CatalogMaterializedTable computeNewTable() {
+        return 
MaterializedTableChangeHandler.buildNewMaterializedTable(getHandlerWithChanges());
+    }
+
+    protected MaterializedTableChangeHandler getHandlerWithChanges() {
+        if (handler == null) {
+            handler =
+                    MaterializedTableChangeHandler.getHandlerWithChanges(
+                            oldTable, getTableChanges());
+        }
+        return handler;
+    }
+
+    protected String getOperationName() {
+        return "ALTER MATERIALIZED TABLE";
+    }
+
     private void checkDroppedColumn(
             DropColumn change,
             List<Column> oldColumns,
             Map<String, Integer> columnIndex,
             List<String> errors) {
-        final Integer idx = columnIndex.get(change.getColumnName());
-        if (idx != null && oldColumns.get(idx).isPersisted()) {
-            errors.add(
-                    String.format(
-                            "Dropping of persisted column `%s` is not 
supported.",
-                            change.getColumnName()));
+        final int idx = columnIndex.getOrDefault(change.getColumnName(), -1);
+        if (idx < 0 || !oldColumns.get(idx).isPersisted()) {
+            return;
         }
+        errors.add(
+                String.format(
+                        "Dropping of persisted column `%s` is not supported.",
+                        change.getColumnName()));
     }
 
     private void checkPositionChange(
@@ -172,14 +198,15 @@ public class AlterMaterializedTableChangeOperation 
extends AlterMaterializedTabl
             List<Column> oldColumns,
             Map<String, Integer> columnIndex,
             List<String> errors) {
-        final Integer idx = columnIndex.get(change.getOldColumn().getName());
-        if (idx != null) {
-            errors.add(
-                    positionChangeError(
-                            change.getOldColumn().asSummaryString(),
-                            change.getNewColumn().asSummaryString(),
-                            idx));
+        final int idx = 
columnIndex.getOrDefault(change.getOldColumn().getName(), -1);
+        if (idx < 0) {
+            return;
         }
+        errors.add(
+                positionChangeError(
+                        change.getOldColumn().asSummaryString(),
+                        change.getNewColumn().asSummaryString(),
+                        idx));
     }
 
     /**
@@ -205,28 +232,6 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
                 position + 1, oldColumn, newColumn);
     }
 
-    @Override
-    public TableResultInternal execute(Context ctx) {
-        validateChanges();
-        ctx.getCatalogManager()
-                .alterTable(getNewTable(), getTableChanges(), 
getTableIdentifier(), false);
-        return TableResultImpl.TABLE_RESULT_OK;
-    }
-
-    @Override
-    public String asSummaryString() {
-        String changes =
-                getTableChanges().stream()
-                        .map(AlterMaterializedTableChangeOperation::toString)
-                        .collect(Collectors.joining(",\n"));
-        return String.format(
-                "%s %s\n%s", getOperationName(), 
tableIdentifier.asSummaryString(), changes);
-    }
-
-    protected String getOperationName() {
-        return "ALTER MATERIALIZED TABLE";
-    }
-
     private static String toString(TableChange tableChange) {
         if (tableChange instanceof ModifyRefreshStatus) {
             ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus) 
tableChange;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
index 4b1d4ae395d..ade473ddeac 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.operations.materializedtable;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.TableChange;
@@ -33,11 +34,22 @@ import java.util.function.Function;
 @Internal
 public class FullAlterMaterializedTableOperation extends 
AlterMaterializedTableChangeOperation {
 
+    private final Function<ResolvedCatalogMaterializedTable, 
CatalogMaterializedTable>
+            newTableBuilder;
+
     public FullAlterMaterializedTableOperation(
             final ObjectIdentifier tableIdentifier,
             final Function<ResolvedCatalogMaterializedTable, 
List<TableChange>> tableChangeForTable,
-            final ResolvedCatalogMaterializedTable oldTable) {
+            final ResolvedCatalogMaterializedTable oldTable,
+            final Function<ResolvedCatalogMaterializedTable, 
CatalogMaterializedTable>
+                    newTableBuilder) {
         super(tableIdentifier, tableChangeForTable, oldTable);
+        this.newTableBuilder = newTableBuilder;
+    }
+
+    @Override
+    protected CatalogMaterializedTable computeNewTable() {
+        return newTableBuilder.apply(getOldTable());
     }
 
     @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 2d6d10bdc53..b788f0560e3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -81,7 +81,13 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
 
         RefreshMode getMergedRefreshMode();
 
+        LogicalRefreshMode getMergedLogicalRefreshMode();
+
         StartMode getMergedStartMode();
+
+        String getMergedComment();
+
+        IntervalFreshness getMergedFreshness();
     }
 
     protected abstract MergeContext getMergeContext(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 0d81346262c..c1e35838ead 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -23,7 +23,10 @@ import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -37,6 +40,7 @@ import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
 import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.MaterializedTableChangeHandler;
 import org.apache.flink.table.planner.operations.converters.MergeTableAsUtil;
 import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 
@@ -50,7 +54,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 
 /** A converter for {@link SqlCreateOrAlterMaterializedTable}. */
 public class SqlCreateOrAlterMaterializedTableConverter
@@ -105,11 +108,40 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final ResolvedCatalogMaterializedTable oldTable,
             final ConvertContext context,
             final ObjectIdentifier identifier) {
+        final SchemaResolver schemaResolver = 
context.getCatalogManager().getSchemaResolver();
         final MergeContext mergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
         return new FullAlterMaterializedTableOperation(
                 identifier,
-                buildTableChanges(mergeContext, 
context.getCatalogManager().getSchemaResolver()),
-                oldTable);
+                currentTable -> buildTableChanges(currentTable, mergeContext, 
schemaResolver),
+                oldTable,
+                currentTable -> buildNewTable(currentTable, mergeContext, 
schemaResolver));
+    }
+
+    private CatalogMaterializedTable buildNewTable(
+            final ResolvedCatalogMaterializedTable currentTable,
+            final MergeContext mergeContext,
+            final SchemaResolver schemaResolver) {
+        return CatalogMaterializedTable.newBuilder()
+                .schema(
+                        MaterializedTableChangeHandler.getHandlerWithChanges(
+                                        currentTable,
+                                        getSchemaTableChanges(
+                                                mergeContext, schemaResolver, 
currentTable))
+                                .retrieveSchema())
+                .comment(mergeContext.getMergedComment())
+                .partitionKeys(mergeContext.getMergedPartitionKeys())
+                .options(mergeContext.getMergedTableOptions())
+                .originalQuery(mergeContext.getMergedOriginalQuery())
+                .expandedQuery(mergeContext.getMergedExpandedQuery())
+                
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
+                .freshness(mergeContext.getMergedFreshness())
+                .logicalRefreshMode(mergeContext.getMergedLogicalRefreshMode())
+                .refreshMode(mergeContext.getMergedRefreshMode())
+                .refreshStatus(currentTable.getRefreshStatus())
+                
.refreshHandlerDescription(currentTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(currentTable.getSerializedRefreshHandler())
+                .startMode(mergeContext.getMergedStartMode())
+                .build();
     }
 
     private Operation handleCreate(
@@ -122,33 +154,37 @@ public class SqlCreateOrAlterMaterializedTableConverter
         return new CreateMaterializedTableOperation(identifier, resolvedTable);
     }
 
-    private Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
buildTableChanges(
-            final MergeContext mergeContext, final SchemaResolver 
schemaResolver) {
-        return oldTable -> {
-            final List<TableChange> changes =
-                    getSchemaTableChanges(mergeContext, schemaResolver, 
oldTable);
+    private List<TableChange> buildTableChanges(
+            final ResolvedCatalogMaterializedTable oldTable,
+            final MergeContext mergeContext,
+            final SchemaResolver schemaResolver) {
+        final List<TableChange> changes =
+                getSchemaTableChanges(mergeContext, schemaResolver, oldTable);
 
-            changes.addAll(getQueryTableChanges(mergeContext, oldTable));
-            changes.addAll(getOptionsTableChanges(mergeContext, oldTable));
-            changes.addAll(getDistributionTableChanges(mergeContext, 
oldTable));
+        changes.addAll(getQueryTableChanges(mergeContext, oldTable));
+        changes.addAll(getOptionsTableChanges(mergeContext, oldTable));
+        changes.addAll(getDistributionTableChanges(mergeContext, oldTable));
 
-            final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
-            final RefreshMode newRefreshMode = 
mergeContext.getMergedRefreshMode();
-            if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
-                throw new ValidationException("Changing of REFRESH MODE is 
unsupported");
-            }
+        final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
+        final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode();
+        if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
+            throw new ValidationException("Changing of REFRESH MODE is 
unsupported");
+        }
 
-            final StartMode newStartMode = mergeContext.getMergedStartMode();
+        final StartMode newStartMode = mergeContext.getMergedStartMode();
+        if (newStartMode != null) {
             final StartMode oldStartMode =
                     oldTable.getStartMode()
                             .orElseThrow(
-                                    () -> new ValidationException("START_MODE 
must not be null"));
+                                    () ->
+                                            new ValidationException(
+                                                    "Start mode must be set on 
materialized table."));
             if (!Objects.equals(oldStartMode, newStartMode)) {
                 changes.add(TableChange.modifyStartMode(newStartMode));
             }
+        }
 
-            return changes;
-        };
+        return changes;
     }
 
     private List<TableChange> getDistributionTableChanges(
@@ -295,21 +331,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
 
             @Override
             public Schema getMergedSchema() {
-                final Set<String> querySchemaColumnNames =
-                        new HashSet<>(querySchema.getColumnNames());
                 final SqlNodeList sqlNodeList = 
sqlCreateMaterializedTable.getColumnList();
-                for (SqlNode column : sqlNodeList) {
-                    if (!(column instanceof SqlRegularColumn)) {
-                        continue;
-                    }
-
-                    SqlRegularColumn physicalColumn = (SqlRegularColumn) 
column;
-                    if 
(!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Invalid as physical column '%s' is 
defined in the DDL, but is not used in a query column.",
-                                        physicalColumn.getName().getSimple()));
-                    }
+                if (createOrAlterOperation(sqlCreateMaterializedTable)) {
+                    MaterializedTableUtils.validatePersistedColumnsUsedByQuery(
+                            sqlNodeList, querySchema);
+                } else {
+                    validatePhysicalColumnsUsedByQuery(sqlNodeList, 
querySchema);
                 }
                 if 
(sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) {
                     // If only column identifiers are provided, then these are 
used to
@@ -358,14 +385,45 @@ public class SqlCreateOrAlterMaterializedTableConverter
 
             @Override
             public RefreshMode getMergedRefreshMode() {
-                return getDerivedRefreshMode(
-                        
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
+                return getDerivedRefreshMode(getMergedLogicalRefreshMode());
+            }
+
+            @Override
+            public LogicalRefreshMode getMergedLogicalRefreshMode() {
+                return 
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable);
             }
 
             @Override
             public StartMode getMergedStartMode() {
                 return getStartMode(sqlCreateMaterializedTable, context);
             }
+
+            @Override
+            public String getMergedComment() {
+                return getComment(sqlCreateMaterializedTable);
+            }
+
+            @Override
+            public IntervalFreshness getMergedFreshness() {
+                return getDerivedFreshness(sqlCreateMaterializedTable);
+            }
         };
     }
+
+    private static void validatePhysicalColumnsUsedByQuery(
+            SqlNodeList sqlNodeList, ResolvedSchema querySchema) {
+        final Set<String> querySchemaColumnNames = new 
HashSet<>(querySchema.getColumnNames());
+        for (SqlNode column : sqlNodeList) {
+            if (!(column instanceof SqlRegularColumn)) {
+                continue;
+            }
+            final SqlRegularColumn physicalColumn = (SqlRegularColumn) column;
+            if 
(!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid as physical column '%s' is defined in 
the DDL, but is not used in a query column.",
+                                physicalColumn.getName().getSimple()));
+            }
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 99fd660e836..e454eb7dd8c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -54,7 +54,6 @@ import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlTimestampLiteral;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.TimestampString;
-import org.apache.commons.lang3.StringUtils;
 
 import java.math.BigDecimal;
 import java.time.Instant;
@@ -447,68 +446,106 @@ public class MaterializedTableUtils {
 
     public static List<TableChange> validateAndExtractColumnChanges(
             ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean 
schemaDefinedInQuery) {
-        final List<Column> newColumns = getPersistedColumns(newSchema);
-        final List<Column> oldColumns = getPersistedColumns(oldSchema);
-        final int originalColumnSize = oldColumns.size();
-        final int newColumnSize = newColumns.size();
-
-        if (originalColumnSize > newColumnSize) {
-            throw new ValidationException(
-                    String.format(
-                            "Failed to modify query because drop column is 
unsupported. "
-                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
-                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
-                            originalColumnSize, newColumnSize));
-        }
-
-        final List<TableChange> columnChanges = new ArrayList<>();
+        final List<Column> oldColumns = oldSchema.getColumns();
+        final Map<String, Tuple2<Column, Integer>> oldByName = new HashMap<>();
         for (int i = 0; i < oldColumns.size(); i++) {
-            final Column oldColumn = oldColumns.get(i);
-            final Column newColumn = newColumns.get(i);
-            final DataType newColumnDataType =
-                    getNewColumnDatatype(oldColumn, newColumns.get(i), 
schemaDefinedInQuery);
-            if (!oldColumn.equals(newColumn)) {
-                if (!oldColumn.getName().equals(newColumn.getName())
-                        || !oldColumn.getDataType().equals(newColumnDataType)) 
{
-                    throw new ValidationException(
-                            String.format(
-                                    "When modifying the query of a 
materialized table, "
-                                            + "currently only support 
appending columns at the end of original schema, dropping, renaming, and 
reordering columns are not supported.\n"
-                                            + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                    i + 1, oldColumn, newColumn));
-                }
+            oldByName.put(oldColumns.get(i).getName(), 
Tuple2.of(oldColumns.get(i), i));
+        }
+        final Set<String> seen = new HashSet<>();
+        final List<Column> newColumns = newSchema.getColumns();
+        final List<TableChange> changes = new ArrayList<>();
+        for (int newIndex = 0; newIndex < newColumns.size(); newIndex++) {
+            final Column newColumn = newColumns.get(newIndex);
+            seen.add(newColumn.getName());
+            final Tuple2<Column, Integer> oldEntry = 
oldByName.get(newColumn.getName());
+            if (oldEntry == null) {
+                changes.add(addChange(newColumn, schemaDefinedInQuery));
+                continue;
+            }
+            final Column oldColumn = oldEntry.f0;
+            // No position diff: DDL order is arbitrary; query-driven reorders 
are caught by
+            // buildSchemaTableChanges on the ALTER MT AS path.
+            if (oldColumn.isPhysical()
+                    && newColumn.isPhysical()
+                    && typeChanged(oldColumn, newColumn, 
schemaDefinedInQuery)) {
+                final DataType newType =
+                        schemaDefinedInQuery
+                                ? newColumn.getDataType()
+                                : newColumn.getDataType().nullable();
+                changes.add(TableChange.modifyPhysicalColumnType(oldColumn, 
newType));
+                // Type changed; still check whether the comment also changed.
                 final String oldComment = oldColumn.getComment().orElse(null);
                 final String newComment = newColumn.getComment().orElse(null);
-
-                if (StringUtils.isEmpty(oldComment) != 
StringUtils.isEmpty(newComment)
-                        || StringUtils.isNotEmpty(oldComment)
-                                && !Objects.equals(oldComment, newComment)) {
-                    
columnChanges.add(TableChange.modifyColumnComment(oldColumn, newComment));
+                if (!Objects.equals(oldComment, newComment)) {
+                    changes.add(TableChange.modifyColumnComment(oldColumn, 
newComment));
                 }
+                continue;
+            }
+            if (oldColumn.getClass() != newColumn.getClass()
+                    || !definitionEquals(oldColumn, newColumn)) {
+                changes.add(
+                        new TableChange.ModifyColumn(
+                                oldColumn,
+                                normalizedColumn(newColumn, 
schemaDefinedInQuery),
+                                null));
+                continue;
+            }
+            final String oldComment = oldColumn.getComment().orElse(null);
+            final String newComment = newColumn.getComment().orElse(null);
+            if (!Objects.equals(oldComment, newComment)) {
+                changes.add(TableChange.modifyColumnComment(oldColumn, 
newComment));
             }
         }
 
-        for (int i = oldColumns.size(); i < newColumns.size(); i++) {
-            Column newColumn = newColumns.get(i);
-            columnChanges.add(
-                    TableChange.add(
-                            schemaDefinedInQuery
-                                    ? newColumn
-                                    : 
newColumn.copy(newColumn.getDataType().nullable())));
+        for (Map.Entry<String, Tuple2<Column, Integer>> entry : 
oldByName.entrySet()) {
+            if (seen.contains(entry.getKey())) {
+                continue;
+            }
+            // Without an explicit DDL column list the new schema only 
reflects the query
+            // projection, so old non-persisted columns are retained, not 
dropped.
+            if (!schemaDefinedInQuery && !entry.getValue().f0.isPersisted()) {
+                continue;
+            }
+            changes.add(TableChange.dropColumn(entry.getKey()));
         }
+        return changes;
+    }
 
-        return columnChanges;
+    private static TableChange.AddColumn addChange(Column column, boolean 
schemaDefinedInQuery) {
+        return TableChange.add(normalizedColumn(column, schemaDefinedInQuery));
     }
 
-    private static DataType getNewColumnDatatype(
-            Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) {
-        if (schemaDefinedInQuery) {
-            return newColumn.getDataType();
+    private static Column normalizedColumn(Column column, boolean 
schemaDefinedInQuery) {
+        return schemaDefinedInQuery ? column : 
column.copy(column.getDataType().nullable());
+    }
+
+    private static boolean definitionEquals(Column oldColumn, Column 
newColumn) {
+        if (oldColumn instanceof MetadataColumn && newColumn instanceof 
MetadataColumn) {
+            final MetadataColumn oldMeta = (MetadataColumn) oldColumn;
+            final MetadataColumn newMeta = (MetadataColumn) newColumn;
+            return oldMeta.isVirtual() == newMeta.isVirtual()
+                    && Objects.equals(
+                            oldMeta.getMetadataKey().orElse(null),
+                            newMeta.getMetadataKey().orElse(null))
+                    && oldMeta.getDataType().equals(newMeta.getDataType());
         }
-        if 
(oldColumn.getDataType().nullable().equals(newColumn.getDataType().nullable())) 
{
-            return oldColumn.getDataType();
+        if (oldColumn instanceof ComputedColumn && newColumn instanceof 
ComputedColumn) {
+            return Objects.equals(
+                    ((ComputedColumn) oldColumn).getExpression(),
+                    ((ComputedColumn) newColumn).getExpression());
         }
-        return newColumn.getDataType();
+        return true;
+    }
+
+    private static boolean typeChanged(
+            Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) {
+        final DataType oldType = oldColumn.getDataType();
+        final DataType newType = newColumn.getDataType();
+        // schemaDefinedInQuery=false: schema is inferred from the query, 
which may flip
+        // nullability without intent — only the base type difference is a 
real change.
+        return schemaDefinedInQuery
+                ? !oldType.equals(newType)
+                : !oldType.nullable().equals(newType.nullable());
     }
 
     public static ResolvedSchema getQueryOperationResolvedSchema(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index caaf0578db2..6bb5fc5503d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -1266,13 +1266,10 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
         list.add(
                 TestSpec.withExpectedSchema(
+                        // Explicit DDL omits `m` and `calc` — CREATE OR ALTER 
is declarative,
+                        // so non-persisted columns absent from the DDL are 
dropped.
                         "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted (`EXPR$0` INT NOT NULL, `sec` CHAR(1)) AS SELECT 
2, 'a' AS sec",
-                        "(\n"
-                                + "  `m` STRING METADATA VIRTUAL,\n"
-                                + "  `calc` AS ['a' || 'b'],\n"
-                                + "  `EXPR$0` INT NOT NULL,\n"
-                                + "  `sec` CHAR(1)\n"
-                                + ")"));
+                        "(\n" + "  `EXPR$0` INT NOT NULL,\n" + "  `sec` 
CHAR(1)\n" + ")"));
 
         list.add(
                 TestSpec.withExpectedSchema(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java
new file mode 100644
index 00000000000..3d3f177af24
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.table.catalog.Column.computed;
+import static org.apache.flink.table.catalog.Column.metadata;
+import static org.apache.flink.table.catalog.Column.physical;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MaterializedTableUtils#validateAndExtractColumnChanges}. 
*/
+class ValidateAndExtractColumnChangesTest {
+
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("input")
+    void test(TestSpec spec) {
+        assertThat(
+                        MaterializedTableUtils.validateAndExtractColumnChanges(
+                                spec.oldSchema, spec.newSchema, 
spec.schemaDefinedInQuery))
+                .containsExactlyInAnyOrderElementsOf(spec.expected);
+    }
+
+    private static Collection<TestSpec> input() {
+        return List.of(
+                TestSpec.of(
+                        "identical schemas",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of()),
+                TestSpec.of(
+                        "comment added",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(physical("a", 
DataTypes.INT()).withComment("hello")),
+                        true,
+                        List.of(
+                                TableChange.modifyColumnComment(
+                                        physical("a", DataTypes.INT()), 
"hello"))),
+                TestSpec.of(
+                        "comment removed",
+                        schema(physical("a", 
DataTypes.INT()).withComment("hello")),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(
+                                TableChange.modifyColumnComment(
+                                        physical("a", 
DataTypes.INT()).withComment("hello"),
+                                        null))),
+                TestSpec.of(
+                        "comment changed",
+                        schema(physical("a", 
DataTypes.INT()).withComment("old")),
+                        schema(physical("a", 
DataTypes.INT()).withComment("new")),
+                        true,
+                        List.of(
+                                TableChange.modifyColumnComment(
+                                        physical("a", 
DataTypes.INT()).withComment("old"), "new"))),
+                TestSpec.of(
+                        "single column appended",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())),
+                        true,
+                        List.of(TableChange.add(physical("b", 
DataTypes.STRING())))),
+                TestSpec.of(
+                        "multiple columns appended",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                physical("b", DataTypes.STRING()),
+                                physical("c", DataTypes.BOOLEAN())),
+                        true,
+                        List.of(
+                                TableChange.add(physical("b", 
DataTypes.STRING())),
+                                TableChange.add(physical("c", 
DataTypes.BOOLEAN())))),
+                TestSpec.of(
+                        "nullability differs but schema is not defined in 
query",
+                        schema(physical("a", DataTypes.INT().notNull())),
+                        schema(physical("a", DataTypes.INT())),
+                        false,
+                        List.of()),
+                TestSpec.of(
+                        "computed columns are ignored in persisted comparison",
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                computed("comp", expr(DataTypes.INT()))),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                physical("b", DataTypes.STRING()),
+                                computed("comp", expr(DataTypes.INT()))),
+                        true,
+                        List.of(TableChange.add(physical("b", 
DataTypes.STRING())))),
+                TestSpec.of(
+                        "virtual metadata column drop emits dropColumn",
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                metadata("v", DataTypes.INT(), null, true)),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(TableChange.dropColumn("v"))),
+                TestSpec.of(
+                        "non-virtual metadata column treated as persisted",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                metadata("m", DataTypes.STRING(), null, 
false)),
+                        true,
+                        List.of(TableChange.add(metadata("m", 
DataTypes.STRING(), null, false)))),
+                TestSpec.of(
+                        "schemaDefinedInQuery=false makes added column 
nullable",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                physical("b", DataTypes.STRING().notNull())),
+                        false,
+                        List.of(TableChange.add(physical("b", 
DataTypes.STRING())))),
+                TestSpec.of(
+                        "drop persisted column emits dropColumn",
+                        schema(physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(TableChange.dropColumn("b"))),
+                TestSpec.of(
+                        "rename persisted column emits drop + add",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(physical("b", DataTypes.INT())),
+                        true,
+                        List.of(
+                                TableChange.dropColumn("a"),
+                                TableChange.add(physical("b", 
DataTypes.INT())))),
+                TestSpec.of(
+                        "persisted type change emits modifyPhysicalColumnType",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(physical("a", DataTypes.STRING())),
+                        true,
+                        List.of(
+                                TableChange.modifyPhysicalColumnType(
+                                        physical("a", DataTypes.INT()), 
DataTypes.STRING()))),
+                TestSpec.of(
+                        "nullability change with schemaDefinedInQuery=true 
emits modifyPhysicalColumnType",
+                        schema(physical("a", DataTypes.INT().notNull())),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(
+                                TableChange.modifyPhysicalColumnType(
+                                        physical("a", 
DataTypes.INT().notNull()),
+                                        DataTypes.INT()))),
+                TestSpec.of(
+                        "type change + comment change both emitted",
+                        schema(physical("a", 
DataTypes.INT()).withComment("old")),
+                        schema(physical("a", 
DataTypes.STRING()).withComment("new")),
+                        true,
+                        List.of(
+                                TableChange.modifyPhysicalColumnType(
+                                        physical("a", 
DataTypes.INT()).withComment("old"),
+                                        DataTypes.STRING()),
+                                TableChange.modifyColumnComment(
+                                        physical("a", 
DataTypes.INT()).withComment("old"), "new"))),
+                TestSpec.of(
+                        "reorder persisted columns is silent (DDL order is 
arbitrary)",
+                        schema(physical("a", DataTypes.INT()), physical("b", 
DataTypes.STRING())),
+                        schema(physical("b", DataTypes.STRING()), 
physical("a", DataTypes.INT())),
+                        true,
+                        List.of()),
+                TestSpec.of(
+                        "add computed column",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                computed("comp", expr(DataTypes.INT()))),
+                        true,
+                        List.of(TableChange.add(computed("comp", 
expr(DataTypes.INT()))))),
+                TestSpec.of(
+                        "add virtual metadata column",
+                        schema(physical("a", DataTypes.INT())),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                metadata("v", DataTypes.STRING(), null, true)),
+                        true,
+                        List.of(TableChange.add(metadata("v", 
DataTypes.STRING(), null, true)))),
+                TestSpec.of(
+                        "drop computed column",
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                computed("comp", expr(DataTypes.INT()))),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(TableChange.dropColumn("comp"))),
+                TestSpec.of(
+                        "drop virtual metadata column",
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                metadata("v", DataTypes.STRING(), null, true)),
+                        schema(physical("a", DataTypes.INT())),
+                        true,
+                        List.of(TableChange.dropColumn("v"))),
+                TestSpec.of(
+                        "modify computed column expression emits modifyColumn",
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                computed("comp", expr(DataTypes.INT()))),
+                        schema(
+                                physical("a", DataTypes.INT()),
+                                computed("comp", expr(DataTypes.BIGINT()))),
+                        true,
+                        List.of(
+                                new TableChange.ModifyColumn(
+                                        computed("comp", 
expr(DataTypes.INT())),
+                                        computed("comp", 
expr(DataTypes.BIGINT())),
+                                        null))));
+    }
+
+    private static ResolvedSchema schema(Column... columns) {
+        return ResolvedSchema.of(columns);
+    }
+
+    private static ResolvedExpression expr(DataType type) {
+        return new ResolvedExpressionMock(type, () -> "1");
+    }
+
+    private static class TestSpec {
+        private final String name;
+        private final ResolvedSchema oldSchema;
+        private final ResolvedSchema newSchema;
+        private final boolean schemaDefinedInQuery;
+        private final List<TableChange> expected;
+
+        TestSpec(
+                String name,
+                ResolvedSchema oldSchema,
+                ResolvedSchema newSchema,
+                boolean schemaDefinedInQuery,
+                List<TableChange> expected) {
+            this.name = name;
+            this.oldSchema = oldSchema;
+            this.newSchema = newSchema;
+            this.schemaDefinedInQuery = schemaDefinedInQuery;
+            this.expected = expected;
+        }
+
+        static TestSpec of(
+                String name,
+                ResolvedSchema oldSchema,
+                ResolvedSchema newSchema,
+                boolean schemaDefinedInQuery,
+                List<TableChange> expected) {
+            return new TestSpec(name, oldSchema, newSchema, 
schemaDefinedInQuery, expected);
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+}


Reply via email to