lsyldliu commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1906542783


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
+        // does not contain LATERAL which is problematic,
+        // the issue was resolved in CALCITE-4077
+        // (always treat the table function as implicitly LATERAL).
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                getResolvedMaterializedTable(context, identifier);
+        List<Column> addedColumns =
+                validateAndExtractNewColumns(
+                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+        List<MaterializedTableChange> tableChanges = new ArrayList<>();
+        addedColumns.forEach(column -> 
tableChanges.add(TableChange.add(column)));
+        tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery));
+
+        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, updatedTable);
+    }
+
+    private ObjectIdentifier resolveIdentifier(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private ResolvedCatalogMaterializedTable getResolvedMaterializedTable(
+            ConvertContext context, ObjectIdentifier identifier) {
+        ResolvedCatalogBaseTable<?> baseTable =
+                
context.getCatalogManager().getTableOrError(identifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != baseTable.getTableKind()) {
+            throw new ValidationException(
+                    "Only materialized table support modify definition 
query.");
+        }
+        return (ResolvedCatalogMaterializedTable) baseTable;
+    }
+
+    private CatalogMaterializedTable buildUpdatedMaterializedTable(
+            ResolvedCatalogMaterializedTable oldTable,
+            List<Column> addedColumns,
+            String definitionQuery) {
+
+        Schema.Builder newSchemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+        addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        return CatalogMaterializedTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(oldTable.getOptions())
+                .definitionQuery(definitionQuery)
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(oldTable.getRefreshStatus())
+                
.refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler())
+                .build();
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(
+                    String.format(
+                            "Query modification failed: Column deletion is not 
allowed. "

Review Comment:
   Failed to modify query because drop column is unsupported. When modifying a 
query, you can only append new columns at the end of original schema. The 
original schema has %d columns, but the newly derived schema from the query has 
%d columns.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -99,9 +88,13 @@ private String toString(TableChange.MaterializedTableChange 
tableChange) {
             return String.format(
                     "  MODIFY REFRESH HANDLER DESCRIPTION TO '%s'",
                     refreshHandler.getRefreshHandlerDesc());
+        } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) {
+            TableChange.ModifyDefinitionQuery definitionQuery =
+                    (TableChange.ModifyDefinitionQuery) tableChange;
+            return String.format(
+                    " MODIFY DEFINITION QUERY TO '%s'", 
definitionQuery.getDefinitionQuery());
         } else {
-            throw new UnsupportedOperationException(
-                    String.format("Unknown materialized table change: %s.", 
tableChange));
+            return tableChange.toString();

Review Comment:
   I think if the MaterializedTableChange subclass also is TableChange 
subclass, we can utilize the AlterTableChangeOperation.toString method to print 
it. Otherwise, throw the UnsupportedOperationException



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        // The LATERAL operator was eliminated during sql validation, thus the 
unparsed SQL
+        // does not contain LATERAL which is problematic,
+        // the issue was resolved in CALCITE-4077
+        // (always treat the table function as implicitly LATERAL).
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                getResolvedMaterializedTable(context, identifier);
+        List<Column> addedColumns =
+                validateAndExtractNewColumns(
+                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+        List<MaterializedTableChange> tableChanges = new ArrayList<>();
+        addedColumns.forEach(column -> 
tableChanges.add(TableChange.add(column)));
+        tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery));
+
+        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, updatedTable);
+    }
+
+    private ObjectIdentifier resolveIdentifier(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private ResolvedCatalogMaterializedTable getResolvedMaterializedTable(
+            ConvertContext context, ObjectIdentifier identifier) {
+        ResolvedCatalogBaseTable<?> baseTable =
+                
context.getCatalogManager().getTableOrError(identifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != baseTable.getTableKind()) {
+            throw new ValidationException(
+                    "Only materialized table support modify definition 
query.");
+        }
+        return (ResolvedCatalogMaterializedTable) baseTable;
+    }
+
+    private CatalogMaterializedTable buildUpdatedMaterializedTable(
+            ResolvedCatalogMaterializedTable oldTable,
+            List<Column> addedColumns,
+            String definitionQuery) {
+
+        Schema.Builder newSchemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+        addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        return CatalogMaterializedTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(oldTable.getOptions())
+                .definitionQuery(definitionQuery)
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(oldTable.getRefreshStatus())
+                
.refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler())
+                .build();
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(
+                    String.format(
+                            "Query modification failed: Column deletion is not 
allowed. "
+                                    + "When modifying a query, you can only 
append new columns at the end. "
+                                    + "Original schema has %d columns, but new 
schema has %d columns.",
+                            originalColumnSize, newColumnSize));
+        }
+
+        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
+            Column oldColumn = oldSchema.getColumns().get(i);
+            Column newColumn = newSchema.getColumns().get(i);
+            if (!oldColumn.equals(newColumn)) {
+                throw new ValidationException(
+                        String.format(
+                                "When modifying the query of a materialized 
table, currently only support appending columns at the end of the schema, 
dropping, renaming and reordering columns are not supported.\n"

Review Comment:
   ```suggestion
                                   "When modifying the query of a materialized 
table, currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -391,6 +451,127 @@ void testAlterMaterializedTableResume() {
                 .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 
RESUME WITH (k1: [v1])");
     }
 
+    @Test
+    void testAlterMaterializedTableAsQuery() throws TableNotExistException {
+        String sql =
+                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, d as 
e, cast('123' as string) as f FROM t3";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+
+        AlterMaterializedTableAsQueryOperation op =
+                (AlterMaterializedTableAsQueryOperation) operation;
+        assertThat(op.getTableChanges())
+                .isEqualTo(
+                        Arrays.asList(
+                                TableChange.add(
+                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.add(
+                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                                + "FROM 
`builtin`.`default`.`t3`")));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
+                                + "FROM `builtin`.`default`.`t3`");
+
+        // new table only difference schema & definition query with old table.
+        CatalogMaterializedTable oldTable =
+                (CatalogMaterializedTable)
+                        catalog.getTable(
+                                new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
+        CatalogMaterializedTable newTable = op.getNewMaterializedTable();
+
+        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
+        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
+                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
+        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
+                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
+        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+
+        List<Schema.UnresolvedColumn> addedColumn =
+                newTable.getUnresolvedSchema().getColumns().stream()
+                        .filter(
+                                column ->
+                                        !oldTable.getUnresolvedSchema()
+                                                .getColumns()
+                                                .contains(column))
+                        .collect(Collectors.toList());
+        // added column should be a nullable column.
+        assertThat(addedColumn)
+                .isEqualTo(
+                        Arrays.asList(
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithConflictColumnName() {
+        String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
d, c as a FROM t3";

Review Comment:
   In next pr, we should forbid the duplicated name case explicitly.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +809,160 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // directly apply the alter operation
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == oldMaterializedTable.getRefreshStatus()) {
+            // 1. suspend the materialized table
+            CatalogMaterializedTable suspendMaterializedTable =
+                    suspendContinuousRefreshJob(
+                            operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+
+            // 2. alter materialized table schema & query definition
+            CatalogMaterializedTable updatedMaterializedTable =
+                    op.getNewMaterializedTable()
+                            .copy(
+                                    
suspendMaterializedTable.getRefreshStatus(),
+                                    suspendMaterializedTable
+                                            .getRefreshHandlerDescription()
+                                            .orElse(null),
+                                    
suspendMaterializedTable.getSerializedRefreshHandler());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
updatedMaterializedTable);
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+
+            // 3. resume the materialized table
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // Roll back the changes to the materialized table and restore 
the continuous
+                // refresh job
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter materialized table as query 
operation.",
+                        tableIdentifier,
+                        e);
+
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                suspendMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                ContinuousRefreshHandler continuousRefreshHandler =

Review Comment:
   I think the ContinuousRefreshHandler should come from 
`suspendMaterializedTable` or `updatedMaterializedTable` becasue we should 
resume the refresh job savepoint.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +809,160 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {

Review Comment:
   I think we forgot one important logic: explain the select query and ensure 
it can compile successfully in the corresponding refresh mode when creating and 
modifying a query. We should do more pre-check actions before creating or 
altering a query. Can you help create one Jira issue to track it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to