This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9260b33cc5109c6dd2042a8f0ce645b6d56d4d57
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Sun Jan 5 20:05:38 2025 +0800

    [FLINK-36994][table] Support converting ALTER MATERIALIZED TABLE AS node to 
operation
---
 .../AlterMaterializedTableAsQueryOperation.java    |  66 +++++++++
 .../AlterMaterializedTableChangeOperation.java     |  28 ++--
 .../apache/flink/table/catalog/TableChange.java    |  93 ++++++++++--
 .../SqlAlterMaterializedTableAsQueryConverter.java | 161 +++++++++++++++++++++
 .../operations/converters/SqlNodeConverters.java   |   1 +
 ...erializedTableNodeToOperationConverterTest.java | 143 ++++++++++++++++++
 6 files changed, 461 insertions(+), 31 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
new file mode 100644
index 00000000000..331e8cd0220
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
+
+import java.util.List;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<MaterializedTableChange> tableChanges;
+
+    private final CatalogMaterializedTable newMaterializedTable;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier,
+            List<MaterializedTableChange> tableChanges,
+            CatalogMaterializedTable newMaterializedTable) {
+        super(tableIdentifier);
+        this.tableChanges = tableChanges;
+        this.newMaterializedTable = newMaterializedTable;
+    }
+
+    public List<MaterializedTableChange> getTableChanges() {
+        return tableChanges;
+    }
+
+    public CatalogMaterializedTable getNewMaterializedTable() {
+        return newMaterializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new UnsupportedOperationException(
+                "AlterMaterializedTableAsQueryOperation doesn't support 
ExecutableOperation yet.");
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format(
+                "ALTER MATERIALIZED TABLE %s AS %s",
+                tableIdentifier.asSummaryString(), 
newMaterializedTable.getDefinitionQuery());
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
index 49f220a8ddc..d71946c7a56 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
 import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
 
 import java.util.List;
@@ -35,19 +36,19 @@ import java.util.stream.Collectors;
 @Internal
 public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTableOperation {
 
-    private final List<TableChange> tableChanges;
+    private final List<MaterializedTableChange> tableChanges;
     private final CatalogMaterializedTable catalogMaterializedTable;
 
     public AlterMaterializedTableChangeOperation(
             ObjectIdentifier tableIdentifier,
-            List<TableChange> tableChanges,
+            List<MaterializedTableChange> tableChanges,
             CatalogMaterializedTable catalogMaterializedTable) {
         super(tableIdentifier);
         this.tableChanges = tableChanges;
         this.catalogMaterializedTable = catalogMaterializedTable;
     }
 
-    public List<TableChange> getTableChanges() {
+    public List<MaterializedTableChange> getTableChanges() {
         return tableChanges;
     }
 
@@ -72,22 +73,13 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
     public String asSummaryString() {
         String changes =
                 tableChanges.stream()
-                        .map(
-                                tableChange -> {
-                                    if (tableChange
-                                            instanceof 
TableChange.MaterializedTableChange) {
-                                        return toString(
-                                                
(TableChange.MaterializedTableChange) tableChange);
-                                    } else {
-                                        return 
AlterTableChangeOperation.toString(tableChange);
-                                    }
-                                })
+                        .map(AlterMaterializedTableChangeOperation::toString)
                         .collect(Collectors.joining(",\n"));
         return String.format(
                 "ALTER MATERIALIZED TABLE %s\n%s", 
tableIdentifier.asSummaryString(), changes);
     }
 
-    private String toString(TableChange.MaterializedTableChange tableChange) {
+    private static String toString(MaterializedTableChange tableChange) {
         if (tableChange instanceof TableChange.ModifyRefreshStatus) {
             TableChange.ModifyRefreshStatus refreshStatus =
                     (TableChange.ModifyRefreshStatus) tableChange;
@@ -99,9 +91,13 @@ public class AlterMaterializedTableChangeOperation extends 
AlterMaterializedTabl
             return String.format(
                     "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
                     refreshHandler.getRefreshHandlerDesc());
+        } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
+            TableChange.ModifyDefinitionQuery definitionQuery =
+                    (TableChange.ModifyDefinitionQuery) tableChange;
+            return String.format(
+                    " MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
         } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unknown materialized table change: %s.", 
tableChange));
+            return AlterTableChangeOperation.toString(tableChange);
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
index f5531f3bcf3..76ac98bd38d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java
@@ -28,7 +28,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Objects;
 
-/** {@link TableChange} represents the modification of the table. */
+/** {@link TableChange} represents the modification of the {@link 
CatalogBaseTable}. */
 @PublicEvolving
 public interface TableChange {
 
@@ -380,6 +380,16 @@ public interface TableChange {
         return new ModifyRefreshHandler(refreshHandlerDesc, 
refreshHandlerBytes);
     }
 
+    /**
+     * A table change to modify materialized table definition query.
+     *
+     * @param definitionQuery the modified definition query.
+     * @return a TableChange represents the modification.
+     */
+    static ModifyDefinitionQuery modifyDefinitionQuery(String definitionQuery) 
{
+        return new ModifyDefinitionQuery(definitionQuery);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Add Change
     // 
--------------------------------------------------------------------------------------------
@@ -392,9 +402,13 @@ public interface TableChange {
      * <pre>
      *    ALTER TABLE &lt;table_name&gt; ADD &lt;column_definition&gt; 
&lt;column_position&gt;
      * </pre>
+     *
+     * <p>Note: An <code>ALTER MATERIALIZED TABLE AS QUERY</code> operation 
may also produce an
+     * <code>AddColumn</code> change. This occurs when the materialized 
table's schema is updated to
+     * align with the structure of the query results, which might require 
adding new columns.
      */
     @PublicEvolving
-    class AddColumn implements TableChange {
+    class AddColumn implements CatalogTableChange, MaterializedTableChange {
 
         private final Column column;
         private final ColumnPosition position;
@@ -447,7 +461,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class AddUniqueConstraint implements TableChange {
+    class AddUniqueConstraint implements CatalogTableChange {
 
         private final UniqueConstraint constraint;
 
@@ -493,7 +507,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class AddDistribution implements TableChange {
+    class AddDistribution implements CatalogTableChange {
 
         private final TableDistribution distribution;
 
@@ -539,7 +553,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class AddWatermark implements TableChange {
+    class AddWatermark implements CatalogTableChange {
 
         private final WatermarkSpec watermarkSpec;
 
@@ -601,7 +615,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ModifyColumn implements TableChange {
+    class ModifyColumn implements CatalogTableChange {
 
         protected final Column oldColumn;
         protected final Column newColumn;
@@ -848,7 +862,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ModifyUniqueConstraint implements TableChange {
+    class ModifyUniqueConstraint implements CatalogTableChange {
 
         private final UniqueConstraint newConstraint;
 
@@ -894,7 +908,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ModifyDistribution implements TableChange {
+    class ModifyDistribution implements CatalogTableChange {
 
         private final TableDistribution distribution;
 
@@ -940,7 +954,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ModifyWatermark implements TableChange {
+    class ModifyWatermark implements CatalogTableChange {
 
         private final WatermarkSpec newWatermark;
 
@@ -988,9 +1002,14 @@ public interface TableChange {
      * <pre>
      *    ALTER TABLE &lt;table_name&gt; DROP COLUMN &lt;column_name&gt;
      * </pre>
+     *
+     * <p>Note: A <code>DropColumn</code> change may also occur when rolling 
back the schema during
+     * a failed <code>ALTER MATERIALIZED TABLE AS QUERY</code> operation. If 
the operation fails,
+     * columns added to align with the query results may need to be removed to 
restore the original
+     * schema.
      */
     @PublicEvolving
-    class DropColumn implements TableChange {
+    class DropColumn implements CatalogTableChange, MaterializedTableChange {
 
         private final String columnName;
 
@@ -1036,7 +1055,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class DropWatermark implements TableChange {
+    class DropWatermark implements CatalogTableChange {
         static final DropWatermark INSTANCE = new DropWatermark();
 
         @Override
@@ -1055,7 +1074,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class DropConstraint implements TableChange {
+    class DropConstraint implements CatalogTableChange {
 
         private final String constraintName;
 
@@ -1101,7 +1120,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class DropDistribution implements TableChange {
+    class DropDistribution implements CatalogTableChange {
         static final DropDistribution INSTANCE = new DropDistribution();
 
         @Override
@@ -1124,7 +1143,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class SetOption implements TableChange {
+    class SetOption implements CatalogTableChange {
 
         private final String key;
         private final String value;
@@ -1177,7 +1196,7 @@ public interface TableChange {
      * </pre>
      */
     @PublicEvolving
-    class ResetOption implements TableChange {
+    class ResetOption implements CatalogTableChange {
 
         private final String key;
 
@@ -1279,6 +1298,13 @@ public interface TableChange {
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Catalog table change
+    // 
--------------------------------------------------------------------------------------------
+    /** {@link CatalogTableChange} represents the modification of the 
CatalogTable. */
+    @PublicEvolving
+    interface CatalogTableChange extends TableChange {}
+
     // 
--------------------------------------------------------------------------------------------
     // Materialized table change
     // 
--------------------------------------------------------------------------------------------
@@ -1374,4 +1400,41 @@ public interface TableChange {
                     + '}';
         }
     }
+
+    /** A table change to modify the definition query. */
+    @PublicEvolving
+    class ModifyDefinitionQuery implements MaterializedTableChange {
+
+        private final String definitionQuery;
+
+        public ModifyDefinitionQuery(String definitionQuery) {
+            this.definitionQuery = definitionQuery;
+        }
+
+        public String getDefinitionQuery() {
+            return definitionQuery;
+        }
+
+        @Override
+        public String toString() {
+            return "ModifyDefinitionQuery{" + "definitionQuery='" + 
definitionQuery + '\'' + '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ModifyDefinitionQuery that = (ModifyDefinitionQuery) o;
+            return Objects.equals(definitionQuery, that.definitionQuery);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(definitionQuery);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
new file mode 100644
index 00000000000..048e2e4d753
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
+        // does not contain LATERAL which is problematic,
+        // the issue was resolved in CALCITE-4077
+        // (always treat the table function as implicitly LATERAL).
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                getResolvedMaterializedTable(context, identifier);
+        List<Column> addedColumns =
+                validateAndExtractNewColumns(
+                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+        List<MaterializedTableChange> tableChanges = new ArrayList<>();
+        addedColumns.forEach(column -> 
tableChanges.add(TableChange.add(column)));
+        tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery));
+
+        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, updatedTable);
+    }
+
+    private ObjectIdentifier resolveIdentifier(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private ResolvedCatalogMaterializedTable getResolvedMaterializedTable(
+            ConvertContext context, ObjectIdentifier identifier) {
+        ResolvedCatalogBaseTable<?> baseTable =
+                
context.getCatalogManager().getTableOrError(identifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != baseTable.getTableKind()) {
+            throw new ValidationException(
+                    "Only materialized table support modify definition 
query.");
+        }
+        return (ResolvedCatalogMaterializedTable) baseTable;
+    }
+
+    private CatalogMaterializedTable buildUpdatedMaterializedTable(
+            ResolvedCatalogMaterializedTable oldTable,
+            List<Column> addedColumns,
+            String definitionQuery) {
+
+        Schema.Builder newSchemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+        addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        return CatalogMaterializedTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(oldTable.getOptions())
+                .definitionQuery(definitionQuery)
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(oldTable.getRefreshStatus())
+                
.refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler())
+                .build();
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(
+                    String.format(
+                            "Failed to modify query because drop column is 
unsupported. "
+                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
+                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
+                            originalColumnSize, newColumnSize));
+        }
+
+        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
+            Column oldColumn = oldSchema.getColumns().get(i);
+            Column newColumn = newSchema.getColumns().get(i);
+            if (!oldColumn.equals(newColumn)) {
+                throw new ValidationException(
+                        String.format(
+                                "When modifying the query of a materialized 
table, "
+                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
+                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                                i, oldColumn, newColumn));
+            }
+        }
+
+        for (int i = oldSchema.getColumns().size(); i < 
newSchema.getColumns().size(); i++) {
+            Column newColumn = newSchema.getColumns().get(i);
+            
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
+        }
+
+        return newAddedColumns;
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 25ab375469f..87570ee9c3c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -63,6 +63,7 @@ public class SqlNodeConverters {
         register(new SqlAlterMaterializedTableRefreshConverter());
         register(new SqlAlterMaterializedTableSuspendConverter());
         register(new SqlAlterMaterializedTableResumeConverter());
+        register(new SqlAlterMaterializedTableAsQueryConverter());
         register(new SqlDropMaterializedTableConverter());
         register(new SqlShowTablesConverter());
         register(new SqlShowViewsConverter());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index ff3cf0c0849..a3b659f335b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -28,9 +28,12 @@ import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
@@ -45,7 +48,9 @@ import org.junit.jupiter.api.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -72,6 +77,25 @@ public class SqlMaterializedTableNodeToOperationConverterTest
         final CatalogTable catalogTable =
                 CatalogTable.of(tableSchema, "", Arrays.asList("b", "c"), 
options);
         catalog.createTable(path3, catalogTable, true);
+
+        // create materialized table
+        final String sql =
+                "CREATE MATERIALIZED TABLE base_mtbl (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        final ObjectPath path4 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl");
+
+        CreateMaterializedTableOperation operation = 
(CreateMaterializedTableOperation) parse(sql);
+        catalog.createTable(path4, operation.getCatalogMaterializedTable(), 
true);
     }
 
     @Test
@@ -391,6 +415,125 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                 .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 
RESUME WITH (k1: [v1])");
     }
 
+    @Test
+    void testAlterMaterializedTableAsQuery() throws TableNotExistException {
+        String sql =
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as 
e, cast('123' as string) as f FROM t3";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+
+        AlterMaterializedTableAsQueryOperation op =
+                (AlterMaterializedTableAsQueryOperation) operation;
+        assertThat(op.getTableChanges())
+                .isEqualTo(
+                        Arrays.asList(
+                                TableChange.add(
+                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.add(
+                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                                + "FROM 
`builtin`.`default`.`t3`")));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
+                                + "FROM `builtin`.`default`.`t3`");
+
+        // new table only difference schema & definition query with old table.
+        CatalogMaterializedTable oldTable =
+                (CatalogMaterializedTable)
+                        catalog.getTable(
+                                new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
+        CatalogMaterializedTable newTable = op.getNewMaterializedTable();
+
+        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
+        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
+                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
+        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
+                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
+        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+
+        List<Schema.UnresolvedColumn> addedColumn =
+                newTable.getUnresolvedSchema().getColumns().stream()
+                        .filter(
+                                column ->
+                                        !oldTable.getUnresolvedSchema()
+                                                .getColumns()
+                                                .contains(column))
+                        .collect(Collectors.toList());
+        // added column should be a nullable column.
+        assertThat(addedColumn)
+                .isEqualTo(
+                        Arrays.asList(
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithConflictColumnName() {
+        String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
d, c as a FROM t3";
+        AlterMaterializedTableAsQueryOperation 
sqlAlterMaterializedTableAsQuery =
+                (AlterMaterializedTableAsQueryOperation) parse(sql5);
+
+        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+                .isEqualTo(
+                        Arrays.asList(
+                                TableChange.add(Column.physical("a0", 
DataTypes.INT())),
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`c` AS `a`\n"
+                                                + "FROM 
`builtin`.`default`.`t3`")));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithUnsupportedColumnChange() {
+        // 1. delete existing column
+        String sql1 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM 
t3";
+        assertThatThrownBy(() -> parse(sql1))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "Failed to modify query because drop column is 
unsupported. When modifying a query, you can only append new columns at the end 
of original schema. The original schema has 4 columns, but the newly derived 
schema from the query has 2 columns.");
+        // 2. swap column position
+        String sql2 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c 
FROM t3";
+        assertThatThrownBy(() -> parse(sql2))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
+                                + "Column mismatch at position 2: Original 
column is [`c` INT], but new column is [`d` STRING].");
+        // 3. change existing column type
+        String sql3 =
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast(d 
as int) as d FROM t3";
+        assertThatThrownBy(() -> parse(sql3))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
+                                + "Column mismatch at position 3: Original 
column is [`d` STRING], but new column is [`d` INT].");
+        // 4. change existing column nullability
+        String sql4 =
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
cast('d' as string) as d FROM t3";
+        assertThatThrownBy(() -> parse(sql4))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
+                                + "Column mismatch at position 3: Original 
column is [`d` STRING], but new column is [`d` STRING NOT NULL].");
+    }
+
+    @Test
+    void testAlterAlterMaterializedTableAsQueryWithCatalogTable() {
+        // t1 is a CatalogTable not a Materialized Table
+        final String sql = "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1";
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("Only materialized table support modify 
definition query.");
+    }
+
     @Test
     void testDropMaterializedTable() {
         final String sql = "DROP MATERIALIZED TABLE mtbl1";


Reply via email to